You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 10:56:49 UTC

[inlong] branch branch-1.6 updated (15f949e7a -> 1643d8230)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from 15f949e7a [INLONG-7601][Release] Change the tag of Docker images to 1.6.0 (#7624)
     new 553b92d04 [INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)
     new 53f57f746 [INLONG-7625][Manager] Make data encoding type as a common property of StreamSink (#7628)
     new ee9aaf10d [INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)
     new 1643d8230 [INLONG-7626][Docker] Optimize the docker push workflow (#7627)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci_docker.yml                    | 20 ++++++++
 docker/build-docker-images.sh                      |  4 +-
 docker/publish-by-arch.sh                          | 24 +++-------
 .../inlong/manager/common/enums/GroupStatus.java   |  8 ++++
 .../dao/mapper/StreamSourceEntityMapper.java       | 16 +++++++
 .../resources/mappers/StreamSourceEntityMapper.xml | 42 ++++++++++++++++-
 .../inlong/manager/pojo/sink/StreamSink.java       |  4 ++
 .../inlong/manager/pojo/sink/hive/HiveSink.java    |  3 --
 .../service/node/AbstractDataNodeOperator.java     | 29 +++++++++++-
 .../manager/service/node/DataNodeOperator.java     |  4 +-
 .../manager/service/node/DataNodeServiceImpl.java  |  3 +-
 .../service/node/mysql/MySQLDataNodeOperator.java  | 15 ++++++
 .../pulsar/{ => internal}/FlinkPulsarSource.java   | 16 +++----
 .../FlinkPulsarSourceWithoutAdmin.java}            | 34 +++++--------
 .../{withoutadmin => internal}/PulsarFetcher.java  | 55 +++++++++++++++++++++-
 .../PulsarMetadataReader.java                      | 50 +++++++++++++-------
 .../{withoutadmin => internal}/ReaderThread.java   |  2 +-
 .../pulsar/table/PulsarDynamicTableSource.java     | 10 ++--
 .../pulsar/withoutadmin/CallbackCollector.java     | 47 ------------------
 licenses/inlong-sort-connectors/LICENSE            | 11 +++--
 20 files changed, 260 insertions(+), 137 deletions(-)
 rename inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/{ => internal}/FlinkPulsarSource.java (98%)
 rename inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/{withoutadmin/FlinkPulsarSource.java => internal/FlinkPulsarSourceWithoutAdmin.java} (97%)
 rename inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/{withoutadmin => internal}/PulsarFetcher.java (94%)
 rename inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/{withoutadmin => internal}/PulsarMetadataReader.java (84%)
 rename inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/{withoutadmin => internal}/ReaderThread.java (99%)
 delete mode 100644 inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java


[inlong] 02/04: [INLONG-7625][Manager] Make data encoding type as a common property of StreamSink (#7628)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 53f57f746612183b0a07a9673f70076c814fc510
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Mar 16 16:26:56 2023 +0800

    [INLONG-7625][Manager] Make data encoding type as a common property of StreamSink (#7628)
---
 .../src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java | 4 ++++
 .../main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSink.java  | 3 ---
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
index cbd6195a1..65aaf011c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/StreamSink.java
@@ -115,6 +115,10 @@ public abstract class StreamSink extends StreamNode {
     @ApiModelProperty("Properties for sink")
     private Map<String, Object> properties = Maps.newHashMap();
 
+    @Builder.Default
+    @ApiModelProperty("Encoding type for sink, default is UTF-8")
+    private String dataEncoding = "UTF-8";
+
     @JsonIgnore
     @Builder.Default
     @ApiModelProperty("Data format type for stream sink")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSink.java
index 63a410ae8..4fa4ff8a2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSink.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSink.java
@@ -74,9 +74,6 @@ public class HiveSink extends StreamSink {
     @ApiModelProperty("File format, support: TextFile, ORCFile, RCFile, SequenceFile, Avro, Parquet, etc")
     private String fileFormat;
 
-    @ApiModelProperty("Data encoding format: UTF-8, GBK")
-    private String dataEncoding;
-
     @ApiModelProperty("Data separator")
     private String dataSeparator;
 


[inlong] 04/04: [INLONG-7626][Docker] Optimize the docker push workflow (#7627)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1643d8230bf4b585035b661c3cab2b0111ae5c41
Author: Lucas <10...@users.noreply.github.com>
AuthorDate: Fri Mar 17 14:41:13 2023 +0800

    [INLONG-7626][Docker] Optimize the docker push workflow (#7627)
---
 .github/workflows/ci_docker.yml | 20 ++++++++++++++++++++
 docker/build-docker-images.sh   |  4 +---
 docker/publish-by-arch.sh       | 24 ++++++------------------
 3 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/ci_docker.yml b/.github/workflows/ci_docker.yml
index 2665f4015..325de38bb 100644
--- a/.github/workflows/ci_docker.yml
+++ b/.github/workflows/ci_docker.yml
@@ -81,6 +81,26 @@ jobs:
           key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
           restore-keys: ${{ runner.os }}-m2
 
+      - name: Set up swapfile path
+        run: |
+          sudo sysctl -w vm.max_map_count=262144
+          sudo sysctl -w fs.file-max=65536
+          sudo fallocate -l 5G /swapfile
+          sudo chmod 600 /swapfile
+          sudo mkswap /swapfile
+          sudo swapon /swapfile
+
+      - name: Remove unnecessary packages
+        run: |
+          echo "=== Before pruning ==="
+          df -h
+          sudo rm -rf /usr/share/dotnet
+          sudo rm -rf /usr/local/lib/android
+          sudo rm -rf /opt/ghc
+          echo
+          echo "=== After pruning ==="
+          df -h
+
       - name: Build Docker images
         run: mvn --batch-mode --update-snapshots -e -V clean package -DskipTests -Pdocker
         env:
diff --git a/docker/build-docker-images.sh b/docker/build-docker-images.sh
index 592daf27b..53a435d0f 100644
--- a/docker/build-docker-images.sh
+++ b/docker/build-docker-images.sh
@@ -138,10 +138,9 @@ docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/dataproxy:${tag}
 docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/audit:${tag}          inlong-audit/audit-docker/          --build-arg AUDIT_TARBALL=${AUDIT_TARBALL}
 docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/dashboard:${tag}      inlong-dashboard/                   --build-arg DASHBOARD_FILE=${DASHBOARD_FILE}
 docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/agent:${tag}          inlong-agent/agent-docker/          --build-arg AGENT_TARBALL=${AGENT_TARBALL}
-docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/tubemq-all:${tag}     inlong-tubemq/tubemq-docker/tubemq-all/     --build-arg TUBEMQ_TARBALL=${TUBEMQ_TARBALL}
 docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/tubemq-manager:${tag} inlong-tubemq/tubemq-docker/tubemq-manager/ --build-arg TUBEMQ_MANAGER_TARBALL=${TUBEMQ_MANAGER_TARBALL}
 if [ "$BUILD_ARCH" = "$ARCH_X86" ]; then
-  docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/tubemq-cpp:${tag}    inlong-tubemq/tubemq-docker/tubemq-cpp/
+  docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/tubemq-all:${tag}     inlong-tubemq/tubemq-docker/tubemq-all/     --build-arg TUBEMQ_TARBALL=${TUBEMQ_TARBALL}
   docker ${USE_BUILDX} build ${USE_PLATFORM} ${TYPE} -t inlong/tubemq-build:${tag}  inlong-tubemq/tubemq-docker/tubemq-build/
 fi
 
@@ -159,7 +158,6 @@ docker tag inlong/dashboard:${tag}       inlong/dashboard:latest${POSTFIX}
 docker tag inlong/agent:${tag}           inlong/agent:latest${POSTFIX}
 docker tag inlong/tubemq-all:${tag}      inlong/tubemq-all:latest${POSTFIX}
 if [ "$BUILD_ARCH" = "$ARCH_X86" ]; then
-  docker tag inlong/tubemq-cpp:${tag}   inlong/tubemq-cpp:latest${POSTFIX}
   docker tag inlong/tubemq-build:${tag} inlong/tubemq-build:latest${POSTFIX}
 fi
 
diff --git a/docker/publish-by-arch.sh b/docker/publish-by-arch.sh
index 8cfd5d30f..9ab0565c7 100644
--- a/docker/publish-by-arch.sh
+++ b/docker/publish-by-arch.sh
@@ -58,7 +58,6 @@ initTagImageForx86() {
   docker tag inlong/tubemq-all:latest${SRC_POSTFIX}      inlong/tubemq-all:latest${DES_POSTFIX}
   docker tag inlong/tubemq-build:latest${SRC_POSTFIX}    inlong/tubemq-build:latest${DES_POSTFIX}
   docker tag inlong/dashboard:latest${SRC_POSTFIX}       inlong/dashboard:latest${DES_POSTFIX}
-  docker tag inlong/tubemq-cpp:latest${SRC_POSTFIX}      inlong/tubemq-cpp:latest${DES_POSTFIX}
   docker tag inlong/audit:latest${SRC_POSTFIX}           inlong/audit:latest${DES_POSTFIX}
 
   docker tag inlong/manager:${MVN_VERSION}${SRC_POSTFIX}         inlong/manager:${MVN_VERSION}${DES_POSTFIX}
@@ -68,7 +67,6 @@ initTagImageForx86() {
   docker tag inlong/tubemq-all:${MVN_VERSION}${SRC_POSTFIX}      inlong/tubemq-all:${MVN_VERSION}${DES_POSTFIX}
   docker tag inlong/tubemq-build:${MVN_VERSION}${SRC_POSTFIX}    inlong/tubemq-build:${MVN_VERSION}${DES_POSTFIX}
   docker tag inlong/dashboard:${MVN_VERSION}${SRC_POSTFIX}       inlong/dashboard:${MVN_VERSION}${DES_POSTFIX}
-  docker tag inlong/tubemq-cpp:${MVN_VERSION}${SRC_POSTFIX}      inlong/tubemq-cpp:${MVN_VERSION}${DES_POSTFIX}
   docker tag inlong/audit:${MVN_VERSION}${SRC_POSTFIX}           inlong/audit:${MVN_VERSION}${DES_POSTFIX}
 }
 
@@ -94,9 +92,8 @@ tagImage() {
   docker tag inlong/tubemq-manager:latest${SRC_POSTFIX}  ${docker_registry_org}/tubemq-manager:latest${DES_POSTFIX}
   docker tag inlong/dashboard:latest${SRC_POSTFIX}       ${docker_registry_org}/dashboard:latest${DES_POSTFIX}
   docker tag inlong/audit:latest${SRC_POSTFIX}           ${docker_registry_org}/audit:latest${DES_POSTFIX}
-  docker tag inlong/tubemq-all:latest${SRC_POSTFIX}      ${docker_registry_org}/tubemq-all:latest${DES_POSTFIX}
   if [ "$BUILD_ARCH" = "$ARCH_X86" ]; then
-    docker tag inlong/tubemq-cpp:latest${SRC_POSTFIX}      ${docker_registry_org}/tubemq-cpp:latest${DES_POSTFIX}
+    docker tag inlong/tubemq-all:latest${SRC_POSTFIX}      ${docker_registry_org}/tubemq-all:latest${DES_POSTFIX}
     docker tag inlong/tubemq-build:latest${SRC_POSTFIX}    ${docker_registry_org}/tubemq-build:latest${DES_POSTFIX}
   fi
 
@@ -106,9 +103,8 @@ tagImage() {
   docker tag inlong/tubemq-manager:${MVN_VERSION}${SRC_POSTFIX}  ${docker_registry_org}/tubemq-manager:${MVN_VERSION}${DES_POSTFIX}
   docker tag inlong/dashboard:${MVN_VERSION}${SRC_POSTFIX}       ${docker_registry_org}/dashboard:${MVN_VERSION}${DES_POSTFIX}
   docker tag inlong/audit:${MVN_VERSION}${SRC_POSTFIX}           ${docker_registry_org}/audit:${MVN_VERSION}${DES_POSTFIX}
-  docker tag inlong/tubemq-all:${MVN_VERSION}${SRC_POSTFIX}      ${docker_registry_org}/tubemq-all:${MVN_VERSION}${DES_POSTFIX}
   if [ "$BUILD_ARCH" = "$ARCH_X86" ]; then
-    docker tag inlong/tubemq-cpp:${MVN_VERSION}${SRC_POSTFIX}      ${docker_registry_org}/tubemq-cpp:${MVN_VERSION}${DES_POSTFIX}
+    docker tag inlong/tubemq-all:${MVN_VERSION}${SRC_POSTFIX}      ${docker_registry_org}/tubemq-all:${MVN_VERSION}${DES_POSTFIX}
     docker tag inlong/tubemq-build:${MVN_VERSION}${SRC_POSTFIX}    ${docker_registry_org}/tubemq-build:${MVN_VERSION}${DES_POSTFIX}
   fi
   echo "End tagging images"
@@ -148,7 +144,6 @@ pushDefaultImage() {
   docker push inlong/tubemq-all:latest
   docker push inlong/tubemq-build:latest
   docker push inlong/dashboard:latest
-  docker push inlong/tubemq-cpp:latest
   docker push inlong/audit:latest
   docker push inlong/manager:${MVN_VERSION}
   docker push inlong/agent:${MVN_VERSION}
@@ -157,7 +152,6 @@ pushDefaultImage() {
   docker push inlong/tubemq-all:${MVN_VERSION}
   docker push inlong/tubemq-build:${MVN_VERSION}
   docker push inlong/dashboard:${MVN_VERSION}
-  docker push inlong/tubemq-cpp:${MVN_VERSION}
   docker push inlong/audit:${MVN_VERSION}
 }
 
@@ -183,10 +177,9 @@ pushImage() {
   docker push inlong/tubemq-manager:latest${SRC_POSTFIX}
   docker push inlong/dashboard:latest${SRC_POSTFIX}
   docker push inlong/audit:latest${SRC_POSTFIX}
-  docker push inlong/tubemq-all:latest${SRC_POSTFIX}
   if [ "$BUILD_ARCH" = "$ARCH_X86" ]; then
     docker push inlong/tubemq-build:latest${SRC_POSTFIX}
-    docker push inlong/tubemq-cpp:latest${SRC_POSTFIX}
+    docker push inlong/tubemq-all:latest${SRC_POSTFIX}
   fi
 
   docker push inlong/manager:${MVN_VERSION}${SRC_POSTFIX}
@@ -195,10 +188,9 @@ pushImage() {
   docker push inlong/tubemq-manager:${MVN_VERSION}${SRC_POSTFIX}
   docker push inlong/dashboard:${MVN_VERSION}${SRC_POSTFIX}
   docker push inlong/audit:${MVN_VERSION}${SRC_POSTFIX}
-  docker push inlong/tubemq-all:${MVN_VERSION}${SRC_POSTFIX}
   if [ "$BUILD_ARCH" = "$ARCH_X86" ]; then
     docker push inlong/tubemq-build:${MVN_VERSION}${SRC_POSTFIX}
-    docker push inlong/tubemq-cpp:${MVN_VERSION}${SRC_POSTFIX}
+    docker push inlong/tubemq-all:${MVN_VERSION}${SRC_POSTFIX}
   fi
 
   echo "Finished pushing images to inlong"
@@ -211,9 +203,8 @@ pushManifest() {
   docker manifest create --insecure --amend inlong/dataproxy:latest      inlong/dataproxy:latest-aarch64      inlong/dataproxy:latest-x86
   docker manifest create --insecure --amend inlong/dashboard:latest      inlong/dashboard:latest-aarch64      inlong/dashboard:latest-x86
   docker manifest create --insecure --amend inlong/audit:latest          inlong/audit:latest-aarch64          inlong/audit:latest-x86
-  docker manifest create --insecure --amend inlong/tubemq-all:latest     inlong/tubemq-all:latest-aarch64     inlong/tubemq-all:latest-x86
+  docker manifest create --insecure --amend inlong/tubemq-all:latest     inlong/tubemq-all:latest-x86
   docker manifest create --insecure --amend inlong/tubemq-manager:latest inlong/tubemq-manager:latest-aarch64 inlong/tubemq-manager:latest-x86
-  docker manifest create --insecure --amend inlong/tubemq-cpp:latest     inlong/tubemq-cpp:latest-x86
   docker manifest create --insecure --amend inlong/tubemq-build:latest   inlong/tubemq-build:latest-x86
 
   docker manifest push inlong/manager:latest
@@ -223,7 +214,6 @@ pushManifest() {
   docker manifest push inlong/tubemq-all:latest
   docker manifest push inlong/tubemq-build:latest
   docker manifest push inlong/dashboard:latest
-  docker manifest push inlong/tubemq-cpp:latest
   docker manifest push inlong/audit:latest
 
   docker manifest create --insecure --amend inlong/manager:${MVN_VERSION}        inlong/manager:${MVN_VERSION}-aarch64        inlong/manager:${MVN_VERSION}-x86
@@ -232,9 +222,8 @@ pushManifest() {
   docker manifest create --insecure --amend inlong/dashboard:${MVN_VERSION}      inlong/dashboard:${MVN_VERSION}-aarch64      inlong/dashboard:${MVN_VERSION}-x86
   docker manifest create --insecure --amend inlong/audit:${MVN_VERSION}          inlong/audit:${MVN_VERSION}-aarch64          inlong/audit:${MVN_VERSION}-x86
   docker manifest create --insecure --amend inlong/tubemq-manager:${MVN_VERSION} inlong/tubemq-manager:${MVN_VERSION}-aarch64 inlong/tubemq-manager:${MVN_VERSION}-x86
-  docker manifest create --insecure --amend inlong/tubemq-all:${MVN_VERSION}     inlong/tubemq-all:${MVN_VERSION}-aarch64     inlong/tubemq-all:${MVN_VERSION}-x86
+  docker manifest create --insecure --amend inlong/tubemq-all:${MVN_VERSION}     inlong/tubemq-all:${MVN_VERSION}-x86
   docker manifest create --insecure --amend inlong/tubemq-build:${MVN_VERSION}   inlong/tubemq-build:${MVN_VERSION}-x86
-  docker manifest create --insecure --amend inlong/tubemq-cpp:${MVN_VERSION}     inlong/tubemq-cpp:${MVN_VERSION}-x86
 
   docker manifest push inlong/manager:${MVN_VERSION}
   docker manifest push inlong/agent:${MVN_VERSION}
@@ -243,7 +232,6 @@ pushManifest() {
   docker manifest push inlong/tubemq-all:${MVN_VERSION}
   docker manifest push inlong/tubemq-build:${MVN_VERSION}
   docker manifest push inlong/dashboard:${MVN_VERSION}
-  docker manifest push inlong/tubemq-cpp:${MVN_VERSION}
   docker manifest push inlong/audit:${MVN_VERSION}
   echo "End pushing manifest"
 }


[inlong] 01/04: [INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 553b92d042cbf0ea02a290a8f737ada0b76f1401
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Mar 16 16:19:47 2023 +0800

    [INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)
---
 .../pulsar/{ => internal}/FlinkPulsarSource.java   | 16 +++----
 .../FlinkPulsarSourceWithoutAdmin.java}            | 34 +++++--------
 .../{withoutadmin => internal}/PulsarFetcher.java  | 55 +++++++++++++++++++++-
 .../PulsarMetadataReader.java                      | 50 +++++++++++++-------
 .../{withoutadmin => internal}/ReaderThread.java   |  2 +-
 .../pulsar/table/PulsarDynamicTableSource.java     | 10 ++--
 .../pulsar/withoutadmin/CallbackCollector.java     | 47 ------------------
 licenses/inlong-sort-connectors/LICENSE            | 11 +++--
 8 files changed, 117 insertions(+), 108 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
similarity index 98%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 9c83bbc58..3ec787d0d 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -45,8 +45,6 @@ import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
 import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
 import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
 import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
@@ -246,8 +244,11 @@ public class FlinkPulsarSource<T>
 
     protected String auditKeys;
 
+    protected String serverUrl;
+
     public FlinkPulsarSource(
             String adminUrl,
+            String serverUrl,
             ClientConfigurationData clientConf,
             PulsarDeserializationSchema<T> deserializer,
             Properties properties,
@@ -270,6 +271,7 @@ public class FlinkPulsarSource<T>
                 SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams);
         this.useMetrics =
                 SourceSinkUtils.getUseMetrics(caseInsensitiveParams);
+        this.serverUrl = serverUrl;
 
         CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams));
 
@@ -559,6 +561,7 @@ public class FlinkPulsarSource<T>
     protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
         return new PulsarMetadataReader(
                 adminUrl,
+                serverUrl,
                 clientConfigurationData,
                 getSubscriptionName(),
                 caseInsensitiveParams,
@@ -1003,13 +1006,6 @@ public class FlinkPulsarSource<T>
                     }
                 }
                 return specificOffsets;
-            case EXTERNAL_SUBSCRIPTION:
-                Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
-                for (TopicRange topic : topics) {
-                    offsetsFromSubs.put(topic, metadataReader.getPositionFromSubscription(topic,
-                            subscriptionPosition));
-                }
-                return offsetsFromSubs;
         }
         return null;
     }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
similarity index 97%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
index 2a84ec041..62d1c7f10 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -101,7 +101,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
  *
  * @param <T> The type of records produced by this data source.
  */
-public class FlinkPulsarSource<T>
+public class FlinkPulsarSourceWithoutAdmin<T>
         extends
             RichParallelSourceFunction<T>
         implements
@@ -109,7 +109,7 @@ public class FlinkPulsarSource<T>
             CheckpointListener,
             CheckpointedFunction {
 
-    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSource.class);
+    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSourceWithoutAdmin.class);
 
     /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
@@ -249,7 +249,7 @@ public class FlinkPulsarSource<T>
 
     private transient ListState<MetricState> metricStateListState;
 
-    public FlinkPulsarSource(
+    public FlinkPulsarSourceWithoutAdmin(
             String serverUrl,
             ClientConfigurationData clientConf,
             PulsarDeserializationSchema<T> deserializer,
@@ -312,7 +312,8 @@ public class FlinkPulsarSource<T>
      * @return The reader object, to allow function chaining.
      */
     @Deprecated
-    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
+    public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(
+            AssignerWithPunctuatedWatermarks<T> assigner) {
         checkNotNull(assigner);
 
         if (this.watermarkStrategy != null) {
@@ -352,7 +353,7 @@ public class FlinkPulsarSource<T>
      * @return The reader object, to allow function chaining.
      */
     @Deprecated
-    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
+    public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
         checkNotNull(assigner);
 
         if (this.watermarkStrategy != null) {
@@ -388,7 +389,7 @@ public class FlinkPulsarSource<T>
      *
      * @return The consumer object, to allow function chaining.
      */
-    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(
+    public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(
             WatermarkStrategy<T> watermarkStrategy) {
         checkNotNull(watermarkStrategy);
 
@@ -402,30 +403,16 @@ public class FlinkPulsarSource<T>
         return this;
     }
 
-    public FlinkPulsarSource<T> setStartFromEarliest() {
+    public FlinkPulsarSourceWithoutAdmin<T> setStartFromEarliest() {
         this.startupMode = StartupMode.EARLIEST;
         return this;
     }
 
-    public FlinkPulsarSource<T> setStartFromLatest() {
+    public FlinkPulsarSourceWithoutAdmin<T> setStartFromLatest() {
         this.startupMode = StartupMode.LATEST;
         return this;
     }
 
-    public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName) {
-        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
-        this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
-        return this;
-    }
-
-    public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName,
-            MessageId subscriptionPosition) {
-        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
-        this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
-        this.subscriptionPosition = checkNotNull(subscriptionPosition);
-        return this;
-    }
-
     // ------------------------------------------------------------------------
     // Work methods
     // ------------------------------------------------------------------------
@@ -538,6 +525,7 @@ public class FlinkPulsarSource<T>
 
     protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
         return new PulsarMetadataReader(
+                null,
                 serverUrl,
                 clientConfigurationData,
                 getSubscriptionName(),
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
similarity index 94%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
index ab278a1c0..e5f7632e6 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.pulsar.internal.ClosableBlockingQueue;
 import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
 import org.apache.flink.streaming.connectors.pulsar.internal.PoisonState;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicPartitionStateWithWatermarkGenerator;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
 import org.apache.flink.streaming.connectors.pulsar.internal.SourceContextWatermarkOutputAdapter;
@@ -392,7 +393,7 @@ public class PulsarFetcher<T> {
 
     /**
      * Emits a record attaching a timestamp to it.
-     * @param record The record to emit
+     * @param records The records to emit
      * @param partitionState The state of the pulsar partition from which the record was fetched
      * @param offset The offset of the corresponding pulsar record
      * @param pulsarEventTimestamp The timestamp of the pulsar record
@@ -434,6 +435,13 @@ public class PulsarFetcher<T> {
         }
     }
 
+    public void commitOffsetToPulsar(
+            Map<TopicRange, MessageId> offset,
+            PulsarCommitCallback offsetCommitCallback) {
+
+        doCommitOffsetToPulsar(removeEarliestAndLatest(offset), offsetCommitCallback);
+    }
+
     public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> offset) {
         Map<TopicRange, MessageId> result = new HashMap<>();
         for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
@@ -705,6 +713,49 @@ public class PulsarFetcher<T> {
         }
     }
 
+    protected void doCommitOffsetToPulsar(
+            Map<TopicRange, MessageId> offset,
+            PulsarCommitCallback offsetCommitCallback) {
+
+        try {
+            int retries = 0;
+            boolean success = false;
+            while (running) {
+                try {
+                    metadataReader.commitOffsetToCursor(offset);
+                    success = true;
+                    break;
+                } catch (Exception e) {
+                    log.warn("Failed to commit cursor to Pulsar.", e);
+                    if (retries >= commitMaxRetries) {
+                        log.error("Failed to commit cursor to Pulsar after {} attempts", retries);
+                        throw e;
+                    }
+                    retries += 1;
+                    Thread.sleep(1000);
+                }
+            }
+            if (success) {
+                offsetCommitCallback.onSuccess();
+            } else {
+                return;
+            }
+        } catch (Exception e) {
+            if (running) {
+                offsetCommitCallback.onException(e);
+            } else {
+                return;
+            }
+        }
+
+        for (PulsarTopicState state : subscribedPartitionStates) {
+            MessageId off = offset.get(state.getTopicRange());
+            if (off != null) {
+                state.setCommittedOffset(off);
+            }
+        }
+    }
+
     public PulsarMetadataReader getMetaDataReader() {
         return this.metadataReader;
     }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
similarity index 84%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
index 00b0b53e6..10b44b370 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
@@ -15,17 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
 import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
 import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
@@ -85,7 +89,10 @@ public class PulsarMetadataReader implements AutoCloseable {
 
     private final SerializableRange range;
 
+    private PulsarAdmin admin;
+
     public PulsarMetadataReader(
+            String adminUrl,
             String serverUrl,
             ClientConfigurationData clientConf,
             String subscriptionName,
@@ -102,6 +109,9 @@ public class PulsarMetadataReader implements AutoCloseable {
         this.useExternalSubscription = useExternalSubscription;
         this.client = buildPulsarClient(serverUrl, clientConf, caseInsensitiveParams.get(AUTHENTICATION_TOKEN.key()));
         this.range = buildRange(caseInsensitiveParams);
+        if (adminUrl != null) {
+            this.admin = PulsarClientUtils.newAdminFromConf(adminUrl, clientConf);
+        }
     }
 
     private PulsarClient buildPulsarClient(
@@ -133,21 +143,8 @@ public class PulsarMetadataReader implements AutoCloseable {
         return SerializableRange.of(range);
     }
 
-    public PulsarMetadataReader(
-            String serverUrl,
-            ClientConfigurationData clientConf,
-            String subscriptionName,
-            Map<String, String> caseInsensitiveParams,
-            int indexOfThisSubtask,
-            int numParallelSubtasks) throws PulsarClientException {
-
-        this(serverUrl,
-                clientConf,
-                subscriptionName,
-                caseInsensitiveParams,
-                indexOfThisSubtask,
-                numParallelSubtasks,
-                false);
+    private String subscriptionNameFrom(TopicRange topicRange) {
+        return topicRange.isFullRange() ? subscriptionName : subscriptionName + topicRange.getPulsarRange();
     }
 
     @Override
@@ -242,6 +239,27 @@ public class PulsarMetadataReader implements AutoCloseable {
         return Collections.emptyList();
     }
 
+    public void commitOffsetToCursor(Map<TopicRange, MessageId> offset) {
+        Preconditions.checkNotNull(admin, "admin url should not be null");
+        for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
+            TopicRange tp = entry.getKey();
+            try {
+                log.info("Committing offset {} to topic {}", entry.getValue(), tp);
+                admin.topics().resetCursor(tp.getTopic(), subscriptionNameFrom(tp), entry.getValue(), true);
+                log.info("Successfully committed offset {} to topic {}", entry.getValue(), tp);
+            } catch (Throwable e) {
+                if (e instanceof PulsarAdminException &&
+                        (((PulsarAdminException) e).getStatusCode() == 404 ||
+                                ((PulsarAdminException) e).getStatusCode() == 412)) {
+                    log.info("Cannot commit cursor since the topic {} has been deleted during execution", tp);
+                } else {
+                    throw new RuntimeException(
+                            String.format("Failed to commit cursor for %s", tp), e);
+                }
+            }
+        }
+    }
+
     /**
      * Designate the close of the metadata reader.
      */
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
similarity index 99%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
index 0815f5dca..295f4df10 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
 import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index e9465fcb2..d8fcbafb9 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -54,7 +54,8 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.pulsar.withoutadmin.FlinkPulsarSource;
+import org.apache.inlong.sort.pulsar.internal.FlinkPulsarSource;
+import org.apache.inlong.sort.pulsar.internal.FlinkPulsarSourceWithoutAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -273,9 +274,10 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
     private SourceFunction<RowData> createPulsarSource(
             ClientConfigurationData clientConfigurationData,
             PulsarDeserializationSchema<RowData> deserializationSchema) {
-        org.apache.inlong.sort.pulsar.FlinkPulsarSource source =
-                new org.apache.inlong.sort.pulsar.FlinkPulsarSource(
+        FlinkPulsarSource source =
+                new FlinkPulsarSource(
                         adminUrl,
+                        serviceUrl,
                         clientConfigurationData,
                         deserializationSchema,
                         properties,
@@ -310,7 +312,7 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
     private SourceFunction<RowData> createPulsarSourceWithoutAdmin(
             ClientConfigurationData clientConfigurationData,
             PulsarDeserializationSchema<RowData> deserializationSchema) {
-        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+        FlinkPulsarSourceWithoutAdmin<RowData> source = new FlinkPulsarSourceWithoutAdmin<>(
                 serviceUrl,
                 clientConfigurationData,
                 deserializationSchema,
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
deleted file mode 100644
index e61b7c3f7..000000000
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.pulsar.withoutadmin;
-
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.function.ThrowingConsumer;
-
-/**
- * A collector supporting callback.
- */
-public class CallbackCollector<T> implements Collector<T> {
-
-    private final ThrowingConsumer<T, Exception> callback;
-
-    public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
-        this.callback = callback;
-    }
-
-    @Override
-    public void collect(T t) {
-        try {
-            callback.accept(t);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 19aafd5fc..69998097e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -475,11 +475,12 @@
       inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
       inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSink.java
       inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
  Source  : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.)
  License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE
 


[inlong] 03/04: [INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ee9aaf10de9a3beeea897f3d9a2b80d92689b743
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Mar 16 18:43:17 2023 +0800

    [INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)
---
 .../inlong/manager/common/enums/GroupStatus.java   |  8 +++++
 .../dao/mapper/StreamSourceEntityMapper.java       | 16 +++++++++
 .../resources/mappers/StreamSourceEntityMapper.xml | 42 +++++++++++++++++++++-
 .../service/node/AbstractDataNodeOperator.java     | 29 +++++++++++++--
 .../manager/service/node/DataNodeOperator.java     |  4 ++-
 .../manager/service/node/DataNodeServiceImpl.java  |  3 +-
 .../service/node/mysql/MySQLDataNodeOperator.java  | 15 ++++++++
 7 files changed, 112 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index 565b43576..c265a0ba8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -126,6 +126,14 @@ public enum GroupStatus {
                 || status == GroupStatus.CONFIG_FAILED;
     }
 
+    /**
+     * Checks whether the given status allows updating stream source.
+     */
+    public static boolean allowedUpdateSource(GroupStatus status) {
+        return status == GroupStatus.CONFIG_SUCCESSFUL
+                || status == GroupStatus.CONFIG_FAILED;
+    }
+
     /**
      * Checks whether the given status needs to delete the inlong stream first.
      */
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ba7ad89d1..99a06ac34 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -119,6 +119,12 @@ public interface StreamSourceEntityMapper {
      */
     List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
+    /**
+     * Query need update source according to the dataNodeName , clusterName, sourceType
+     */
+    List<Integer> selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String clusterName,
+            @Param("nodeName") String nodeName, @Param("sourceType") String sourceType);
+
     int updateByPrimaryKeySelective(StreamSourceEntity record);
 
     int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
@@ -148,6 +154,16 @@ public interface StreamSourceEntityMapper {
 
     int updateSnapshot(StreamSourceEntity entity);
 
+    /**
+     * Update the source status
+     *
+     * @param idList source id list
+     * @param status modify the status to this
+     * @param operator operator name
+     */
+    void updateStatusByIds(@Param("idList") List<Integer> idList, @Param("status") Integer status,
+            @Param("operator") String operator);
+
     /**
      * Physical delete stream sources by group id and stream id
      */
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 0f6aeb5bb..7c2df0199 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -319,7 +319,29 @@
             for update
         </where>
     </select>
-
+    <select id="selectNeedUpdateIdsByClusterAndDataNode" resultType="java.lang.Integer">
+        select source.id
+        from stream_source source, inlong_stream stream, inlong_group inlong_group
+        <where>
+            source.is_deleted = 0
+            and inlong_group.inlong_group_id = source.inlong_group_id
+            and inlong_group.is_deleted = 0
+            and inlong_group.status in (120, 130)
+            and stream.inlong_group_id = source.inlong_group_id
+            and stream.is_deleted = 0
+            and stream.status in (120, 130)
+            and source.status not in (99, 110)
+            <if test="clusterName != null">
+                and source.inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+            </if>
+            <if test="nodeName != null">
+                and source.data_node_name = #{nodeName, jdbcType=VARCHAR}
+            </if>
+            <if test="sourceType != null">
+                and source.source_type = #{sourceType, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
     <update id="updateByRelatedId">
         update stream_source
         <set>
@@ -458,6 +480,24 @@
             modify_time = modify_time
         where id = #{id,jdbcType=INTEGER}
     </update>
+    <update id="updateStatusByIds">
+        update stream_source
+        <set>
+            previous_status = status,
+            status          = #{status, jdbcType=INTEGER},
+            modifier        = #{operator, jdbcType=VARCHAR},
+            version         = version + 1
+        </set>
+        <where>
+            is_deleted = 0
+            <if test="idList != null and idList.size() > 0">
+                and id in
+                <foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
+                    #{item}
+                </foreach>
+            </if>
+        </where>
+    </update>
 
     <delete id="deleteByRelatedId">
         delete
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index a2e4c2eb1..efa174fd9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -19,11 +19,15 @@ package org.apache.inlong.manager.service.node;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.slf4j.Logger;
@@ -33,6 +37,7 @@ import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -44,6 +49,12 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
 
     @Autowired
     protected DataNodeEntityMapper dataNodeEntityMapper;
+    @Autowired
+    protected StreamSourceEntityMapper sourceMapper;
+    @Autowired
+    protected InlongGroupEntityMapper groupMapper;
+    @Autowired
+    protected InlongStreamEntityMapper streamMapper;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -72,7 +83,6 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
         DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
         // set the ext params
         this.setTargetEntity(request, entity);
-        this.updateRelatedStreamSource(request);
         entity.setModifier(operator);
         int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
@@ -95,7 +105,22 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
     }
 
     @Override
-    public void updateRelatedStreamSource(DataNodeRequest request) {
+    public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
         LOGGER.info("do nothing for the data node type ={}", request.getType());
     }
+
+    public void retryStreamSourceByDataNodeNameAndType(String dataNodeName, String type, String operator) {
+        Integer status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
+        LOGGER.info("begin to update stream source status by dataNodeName={}, status={}, by operator={}",
+                dataNodeName, status, operator);
+        List<Integer> needUpdateIds = sourceMapper.selectNeedUpdateIdsByClusterAndDataNode(null, dataNodeName, type);
+        try {
+            sourceMapper.updateStatusByIds(needUpdateIds, status, operator);
+            LOGGER.info("success to update stream source status by dataNodeName={}, status={}, by operator={}",
+                    dataNodeName, status, operator);
+        } catch (Exception e) {
+            LOGGER.error("failed to update stream source status by dataNodeName={}, status={}, by operator={}",
+                    dataNodeName, status, operator, e);
+        }
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index a0618f4bb..58a12b732 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -83,7 +83,9 @@ public interface DataNodeOperator {
      * Update related stream source.
      *
      * @param request data node request
+     * @param entity data node entity
+     * @param operator operator
      */
-    void updateRelatedStreamSource(DataNodeRequest request);
+    void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index d44f32d03..b2d094e64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -200,7 +200,7 @@ public class DataNodeServiceImpl implements DataNodeService {
         }
         DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
         dataNodeOperator.updateOpt(request, operator);
-
+        dataNodeOperator.updateRelatedStreamSource(request, curEntity, operator);
         LOGGER.info("success to update data node={}", request);
         return true;
     }
@@ -238,6 +238,7 @@ public class DataNodeServiceImpl implements DataNodeService {
         }
         DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
         dataNodeOperator.updateOpt(request, opInfo.getName());
+        dataNodeOperator.updateRelatedStreamSource(request, curEntity, opInfo.getName());
         return true;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 7bc1f8f4b..f3e108ab5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.node.mysql;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -38,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.sql.Connection;
+import java.util.Objects;
 
 /**
  * MySQL data node operator
@@ -106,4 +108,17 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
+        MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) request;
+        MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) this.getFromEntity(entity);
+        boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), mySQLDataNodeInfo.getUrl())
+                || !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), mySQLDataNodeInfo.getBackupUrl())
+                || !Objects.equals(mySQLDataNodeRequest.getUsername(), mySQLDataNodeInfo.getUsername())
+                || !Objects.equals(mySQLDataNodeRequest.getToken(), mySQLDataNodeInfo.getToken());
+        if (changed) {
+            retryStreamSourceByDataNodeNameAndType(request.getName(), SourceType.MYSQL_SQL, operator);
+        }
+    }
+
 }