You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 10:56:50 UTC

[inlong] 01/04: [INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 553b92d042cbf0ea02a290a8f737ada0b76f1401
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Mar 16 16:19:47 2023 +0800

    [INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)
---
 .../pulsar/{ => internal}/FlinkPulsarSource.java   | 16 +++----
 .../FlinkPulsarSourceWithoutAdmin.java}            | 34 +++++--------
 .../{withoutadmin => internal}/PulsarFetcher.java  | 55 +++++++++++++++++++++-
 .../PulsarMetadataReader.java                      | 50 +++++++++++++-------
 .../{withoutadmin => internal}/ReaderThread.java   |  2 +-
 .../pulsar/table/PulsarDynamicTableSource.java     | 10 ++--
 .../pulsar/withoutadmin/CallbackCollector.java     | 47 ------------------
 licenses/inlong-sort-connectors/LICENSE            | 11 +++--
 8 files changed, 117 insertions(+), 108 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
similarity index 98%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 9c83bbc58..3ec787d0d 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -45,8 +45,6 @@ import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
 import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
 import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
 import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
@@ -246,8 +244,11 @@ public class FlinkPulsarSource<T>
 
     protected String auditKeys;
 
+    protected String serverUrl;
+
     public FlinkPulsarSource(
             String adminUrl,
+            String serverUrl,
             ClientConfigurationData clientConf,
             PulsarDeserializationSchema<T> deserializer,
             Properties properties,
@@ -270,6 +271,7 @@ public class FlinkPulsarSource<T>
                 SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams);
         this.useMetrics =
                 SourceSinkUtils.getUseMetrics(caseInsensitiveParams);
+        this.serverUrl = serverUrl;
 
         CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams));
 
@@ -559,6 +561,7 @@ public class FlinkPulsarSource<T>
     protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
         return new PulsarMetadataReader(
                 adminUrl,
+                serverUrl,
                 clientConfigurationData,
                 getSubscriptionName(),
                 caseInsensitiveParams,
@@ -1003,13 +1006,6 @@ public class FlinkPulsarSource<T>
                     }
                 }
                 return specificOffsets;
-            case EXTERNAL_SUBSCRIPTION:
-                Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
-                for (TopicRange topic : topics) {
-                    offsetsFromSubs.put(topic, metadataReader.getPositionFromSubscription(topic,
-                            subscriptionPosition));
-                }
-                return offsetsFromSubs;
         }
         return null;
     }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
similarity index 97%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
index 2a84ec041..62d1c7f10 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -101,7 +101,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
  *
  * @param <T> The type of records produced by this data source.
  */
-public class FlinkPulsarSource<T>
+public class FlinkPulsarSourceWithoutAdmin<T>
         extends
             RichParallelSourceFunction<T>
         implements
@@ -109,7 +109,7 @@ public class FlinkPulsarSource<T>
             CheckpointListener,
             CheckpointedFunction {
 
-    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSource.class);
+    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSourceWithoutAdmin.class);
 
     /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
@@ -249,7 +249,7 @@ public class FlinkPulsarSource<T>
 
     private transient ListState<MetricState> metricStateListState;
 
-    public FlinkPulsarSource(
+    public FlinkPulsarSourceWithoutAdmin(
             String serverUrl,
             ClientConfigurationData clientConf,
             PulsarDeserializationSchema<T> deserializer,
@@ -312,7 +312,8 @@ public class FlinkPulsarSource<T>
      * @return The reader object, to allow function chaining.
      */
     @Deprecated
-    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
+    public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(
+            AssignerWithPunctuatedWatermarks<T> assigner) {
         checkNotNull(assigner);
 
         if (this.watermarkStrategy != null) {
@@ -352,7 +353,7 @@ public class FlinkPulsarSource<T>
      * @return The reader object, to allow function chaining.
      */
     @Deprecated
-    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
+    public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
         checkNotNull(assigner);
 
         if (this.watermarkStrategy != null) {
@@ -388,7 +389,7 @@ public class FlinkPulsarSource<T>
      *
      * @return The consumer object, to allow function chaining.
      */
-    public FlinkPulsarSource<T> assignTimestampsAndWatermarks(
+    public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(
             WatermarkStrategy<T> watermarkStrategy) {
         checkNotNull(watermarkStrategy);
 
@@ -402,30 +403,16 @@ public class FlinkPulsarSource<T>
         return this;
     }
 
-    public FlinkPulsarSource<T> setStartFromEarliest() {
+    public FlinkPulsarSourceWithoutAdmin<T> setStartFromEarliest() {
         this.startupMode = StartupMode.EARLIEST;
         return this;
     }
 
-    public FlinkPulsarSource<T> setStartFromLatest() {
+    public FlinkPulsarSourceWithoutAdmin<T> setStartFromLatest() {
         this.startupMode = StartupMode.LATEST;
         return this;
     }
 
-    public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName) {
-        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
-        this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
-        return this;
-    }
-
-    public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName,
-            MessageId subscriptionPosition) {
-        this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
-        this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
-        this.subscriptionPosition = checkNotNull(subscriptionPosition);
-        return this;
-    }
-
     // ------------------------------------------------------------------------
     // Work methods
     // ------------------------------------------------------------------------
@@ -538,6 +525,7 @@ public class FlinkPulsarSource<T>
 
     protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
         return new PulsarMetadataReader(
+                null,
                 serverUrl,
                 clientConfigurationData,
                 getSubscriptionName(),
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
similarity index 94%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
index ab278a1c0..e5f7632e6 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.pulsar.internal.ClosableBlockingQueue;
 import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
 import org.apache.flink.streaming.connectors.pulsar.internal.PoisonState;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicPartitionStateWithWatermarkGenerator;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
 import org.apache.flink.streaming.connectors.pulsar.internal.SourceContextWatermarkOutputAdapter;
@@ -392,7 +393,7 @@ public class PulsarFetcher<T> {
 
     /**
      * Emits a record attaching a timestamp to it.
-     * @param record The record to emit
+     * @param records The records to emit
      * @param partitionState The state of the pulsar partition from which the record was fetched
      * @param offset The offset of the corresponding pulsar record
      * @param pulsarEventTimestamp The timestamp of the pulsar record
@@ -434,6 +435,13 @@ public class PulsarFetcher<T> {
         }
     }
 
+    public void commitOffsetToPulsar(
+            Map<TopicRange, MessageId> offset,
+            PulsarCommitCallback offsetCommitCallback) {
+
+        doCommitOffsetToPulsar(removeEarliestAndLatest(offset), offsetCommitCallback);
+    }
+
     public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> offset) {
         Map<TopicRange, MessageId> result = new HashMap<>();
         for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
@@ -705,6 +713,49 @@ public class PulsarFetcher<T> {
         }
     }
 
+    protected void doCommitOffsetToPulsar(
+            Map<TopicRange, MessageId> offset,
+            PulsarCommitCallback offsetCommitCallback) {
+
+        try {
+            int retries = 0;
+            boolean success = false;
+            while (running) {
+                try {
+                    metadataReader.commitOffsetToCursor(offset);
+                    success = true;
+                    break;
+                } catch (Exception e) {
+                    log.warn("Failed to commit cursor to Pulsar.", e);
+                    if (retries >= commitMaxRetries) {
+                        log.error("Failed to commit cursor to Pulsar after {} attempts", retries);
+                        throw e;
+                    }
+                    retries += 1;
+                    Thread.sleep(1000);
+                }
+            }
+            if (success) {
+                offsetCommitCallback.onSuccess();
+            } else {
+                return;
+            }
+        } catch (Exception e) {
+            if (running) {
+                offsetCommitCallback.onException(e);
+            } else {
+                return;
+            }
+        }
+
+        for (PulsarTopicState state : subscribedPartitionStates) {
+            MessageId off = offset.get(state.getTopicRange());
+            if (off != null) {
+                state.setCommittedOffset(off);
+            }
+        }
+    }
+
     public PulsarMetadataReader getMetaDataReader() {
         return this.metadataReader;
     }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
similarity index 84%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
index 00b0b53e6..10b44b370 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
@@ -15,17 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
 import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
 import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
 import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
 import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
@@ -85,7 +89,10 @@ public class PulsarMetadataReader implements AutoCloseable {
 
     private final SerializableRange range;
 
+    private PulsarAdmin admin;
+
     public PulsarMetadataReader(
+            String adminUrl,
             String serverUrl,
             ClientConfigurationData clientConf,
             String subscriptionName,
@@ -102,6 +109,9 @@ public class PulsarMetadataReader implements AutoCloseable {
         this.useExternalSubscription = useExternalSubscription;
         this.client = buildPulsarClient(serverUrl, clientConf, caseInsensitiveParams.get(AUTHENTICATION_TOKEN.key()));
         this.range = buildRange(caseInsensitiveParams);
+        if (adminUrl != null) {
+            this.admin = PulsarClientUtils.newAdminFromConf(adminUrl, clientConf);
+        }
     }
 
     private PulsarClient buildPulsarClient(
@@ -133,21 +143,8 @@ public class PulsarMetadataReader implements AutoCloseable {
         return SerializableRange.of(range);
     }
 
-    public PulsarMetadataReader(
-            String serverUrl,
-            ClientConfigurationData clientConf,
-            String subscriptionName,
-            Map<String, String> caseInsensitiveParams,
-            int indexOfThisSubtask,
-            int numParallelSubtasks) throws PulsarClientException {
-
-        this(serverUrl,
-                clientConf,
-                subscriptionName,
-                caseInsensitiveParams,
-                indexOfThisSubtask,
-                numParallelSubtasks,
-                false);
+    private String subscriptionNameFrom(TopicRange topicRange) {
+        return topicRange.isFullRange() ? subscriptionName : subscriptionName + topicRange.getPulsarRange();
     }
 
     @Override
@@ -242,6 +239,27 @@ public class PulsarMetadataReader implements AutoCloseable {
         return Collections.emptyList();
     }
 
+    public void commitOffsetToCursor(Map<TopicRange, MessageId> offset) {
+        Preconditions.checkNotNull(admin, "admin url should not be null");
+        for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
+            TopicRange tp = entry.getKey();
+            try {
+                log.info("Committing offset {} to topic {}", entry.getValue(), tp);
+                admin.topics().resetCursor(tp.getTopic(), subscriptionNameFrom(tp), entry.getValue(), true);
+                log.info("Successfully committed offset {} to topic {}", entry.getValue(), tp);
+            } catch (Throwable e) {
+                if (e instanceof PulsarAdminException &&
+                        (((PulsarAdminException) e).getStatusCode() == 404 ||
+                                ((PulsarAdminException) e).getStatusCode() == 412)) {
+                    log.info("Cannot commit cursor since the topic {} has been deleted during execution", tp);
+                } else {
+                    throw new RuntimeException(
+                            String.format("Failed to commit cursor for %s", tp), e);
+                }
+            }
+        }
+    }
+
     /**
      * Designate the close of the metadata reader.
      */
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
similarity index 99%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
index 0815f5dca..295f4df10 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
 
 import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
 import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index e9465fcb2..d8fcbafb9 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -54,7 +54,8 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.pulsar.withoutadmin.FlinkPulsarSource;
+import org.apache.inlong.sort.pulsar.internal.FlinkPulsarSource;
+import org.apache.inlong.sort.pulsar.internal.FlinkPulsarSourceWithoutAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -273,9 +274,10 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
     private SourceFunction<RowData> createPulsarSource(
             ClientConfigurationData clientConfigurationData,
             PulsarDeserializationSchema<RowData> deserializationSchema) {
-        org.apache.inlong.sort.pulsar.FlinkPulsarSource source =
-                new org.apache.inlong.sort.pulsar.FlinkPulsarSource(
+        FlinkPulsarSource source =
+                new FlinkPulsarSource(
                         adminUrl,
+                        serviceUrl,
                         clientConfigurationData,
                         deserializationSchema,
                         properties,
@@ -310,7 +312,7 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
     private SourceFunction<RowData> createPulsarSourceWithoutAdmin(
             ClientConfigurationData clientConfigurationData,
             PulsarDeserializationSchema<RowData> deserializationSchema) {
-        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+        FlinkPulsarSourceWithoutAdmin<RowData> source = new FlinkPulsarSourceWithoutAdmin<>(
                 serviceUrl,
                 clientConfigurationData,
                 deserializationSchema,
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
deleted file mode 100644
index e61b7c3f7..000000000
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.pulsar.withoutadmin;
-
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.function.ThrowingConsumer;
-
-/**
- * A collector supporting callback.
- */
-public class CallbackCollector<T> implements Collector<T> {
-
-    private final ThrowingConsumer<T, Exception> callback;
-
-    public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
-        this.callback = callback;
-    }
-
-    @Override
-    public void collect(T t) {
-        try {
-            callback.accept(t);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 19aafd5fc..69998097e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -475,11 +475,12 @@
       inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
       inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSink.java
       inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
-      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+      inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
  Source  : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.)
  License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE