You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 10:56:50 UTC
[inlong] 01/04: [INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 553b92d042cbf0ea02a290a8f737ada0b76f1401
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Mar 16 16:19:47 2023 +0800
[INLONG-7614][Sort] Fix sort pulsar connector loss data when using admin url (#7623)
---
.../pulsar/{ => internal}/FlinkPulsarSource.java | 16 +++----
.../FlinkPulsarSourceWithoutAdmin.java} | 34 +++++--------
.../{withoutadmin => internal}/PulsarFetcher.java | 55 +++++++++++++++++++++-
.../PulsarMetadataReader.java | 50 +++++++++++++-------
.../{withoutadmin => internal}/ReaderThread.java | 2 +-
.../pulsar/table/PulsarDynamicTableSource.java | 10 ++--
.../pulsar/withoutadmin/CallbackCollector.java | 47 ------------------
licenses/inlong-sort-connectors/LICENSE | 11 +++--
8 files changed, 117 insertions(+), 108 deletions(-)
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
similarity index 98%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index 9c83bbc58..3ec787d0d 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.pulsar;
+package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
@@ -45,8 +45,6 @@ import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.MessageIdSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher;
-import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSourceStateSerializer;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
@@ -246,8 +244,11 @@ public class FlinkPulsarSource<T>
protected String auditKeys;
+ protected String serverUrl;
+
public FlinkPulsarSource(
String adminUrl,
+ String serverUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
Properties properties,
@@ -270,6 +271,7 @@ public class FlinkPulsarSource<T>
SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams);
this.useMetrics =
SourceSinkUtils.getUseMetrics(caseInsensitiveParams);
+ this.serverUrl = serverUrl;
CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams));
@@ -559,6 +561,7 @@ public class FlinkPulsarSource<T>
protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
return new PulsarMetadataReader(
adminUrl,
+ serverUrl,
clientConfigurationData,
getSubscriptionName(),
caseInsensitiveParams,
@@ -1003,13 +1006,6 @@ public class FlinkPulsarSource<T>
}
}
return specificOffsets;
- case EXTERNAL_SUBSCRIPTION:
- Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
- for (TopicRange topic : topics) {
- offsetsFromSubs.put(topic, metadataReader.getPositionFromSubscription(topic,
- subscriptionPosition));
- }
- return offsetsFromSubs;
}
return null;
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
similarity index 97%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
index 2a84ec041..62d1c7f10 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
@@ -101,7 +101,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
*
* @param <T> The type of records produced by this data source.
*/
-public class FlinkPulsarSource<T>
+public class FlinkPulsarSourceWithoutAdmin<T>
extends
RichParallelSourceFunction<T>
implements
@@ -109,7 +109,7 @@ public class FlinkPulsarSource<T>
CheckpointListener,
CheckpointedFunction {
- private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSource.class);
+ private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSourceWithoutAdmin.class);
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
@@ -249,7 +249,7 @@ public class FlinkPulsarSource<T>
private transient ListState<MetricState> metricStateListState;
- public FlinkPulsarSource(
+ public FlinkPulsarSourceWithoutAdmin(
String serverUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
@@ -312,7 +312,8 @@ public class FlinkPulsarSource<T>
* @return The reader object, to allow function chaining.
*/
@Deprecated
- public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
+ public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(
+ AssignerWithPunctuatedWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.watermarkStrategy != null) {
@@ -352,7 +353,7 @@ public class FlinkPulsarSource<T>
* @return The reader object, to allow function chaining.
*/
@Deprecated
- public FlinkPulsarSource<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
+ public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.watermarkStrategy != null) {
@@ -388,7 +389,7 @@ public class FlinkPulsarSource<T>
*
* @return The consumer object, to allow function chaining.
*/
- public FlinkPulsarSource<T> assignTimestampsAndWatermarks(
+ public FlinkPulsarSourceWithoutAdmin<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
checkNotNull(watermarkStrategy);
@@ -402,30 +403,16 @@ public class FlinkPulsarSource<T>
return this;
}
- public FlinkPulsarSource<T> setStartFromEarliest() {
+ public FlinkPulsarSourceWithoutAdmin<T> setStartFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
return this;
}
- public FlinkPulsarSource<T> setStartFromLatest() {
+ public FlinkPulsarSourceWithoutAdmin<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
return this;
}
- public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName) {
- this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
- this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
- return this;
- }
-
- public FlinkPulsarSource<T> setStartFromSubscription(String externalSubscriptionName,
- MessageId subscriptionPosition) {
- this.startupMode = StartupMode.EXTERNAL_SUBSCRIPTION;
- this.externalSubscriptionName = checkNotNull(externalSubscriptionName);
- this.subscriptionPosition = checkNotNull(subscriptionPosition);
- return this;
- }
-
// ------------------------------------------------------------------------
// Work methods
// ------------------------------------------------------------------------
@@ -538,6 +525,7 @@ public class FlinkPulsarSource<T>
protected PulsarMetadataReader createMetadataReader() throws PulsarClientException {
return new PulsarMetadataReader(
+ null,
serverUrl,
clientConfigurationData,
getSubscriptionName(),
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
similarity index 94%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
index ab278a1c0..e5f7632e6 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.internal.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
import org.apache.flink.streaming.connectors.pulsar.internal.PoisonState;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicPartitionStateWithWatermarkGenerator;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarTopicState;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceContextWatermarkOutputAdapter;
@@ -392,7 +393,7 @@ public class PulsarFetcher<T> {
/**
* Emits a record attaching a timestamp to it.
- * @param record The record to emit
+ * @param records The records to emit
* @param partitionState The state of the pulsar partition from which the record was fetched
* @param offset The offset of the corresponding pulsar record
* @param pulsarEventTimestamp The timestamp of the pulsar record
@@ -434,6 +435,13 @@ public class PulsarFetcher<T> {
}
}
+ public void commitOffsetToPulsar(
+ Map<TopicRange, MessageId> offset,
+ PulsarCommitCallback offsetCommitCallback) {
+
+ doCommitOffsetToPulsar(removeEarliestAndLatest(offset), offsetCommitCallback);
+ }
+
public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> offset) {
Map<TopicRange, MessageId> result = new HashMap<>();
for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
@@ -705,6 +713,49 @@ public class PulsarFetcher<T> {
}
}
+ protected void doCommitOffsetToPulsar(
+ Map<TopicRange, MessageId> offset,
+ PulsarCommitCallback offsetCommitCallback) {
+
+ try {
+ int retries = 0;
+ boolean success = false;
+ while (running) {
+ try {
+ metadataReader.commitOffsetToCursor(offset);
+ success = true;
+ break;
+ } catch (Exception e) {
+ log.warn("Failed to commit cursor to Pulsar.", e);
+ if (retries >= commitMaxRetries) {
+ log.error("Failed to commit cursor to Pulsar after {} attempts", retries);
+ throw e;
+ }
+ retries += 1;
+ Thread.sleep(1000);
+ }
+ }
+ if (success) {
+ offsetCommitCallback.onSuccess();
+ } else {
+ return;
+ }
+ } catch (Exception e) {
+ if (running) {
+ offsetCommitCallback.onException(e);
+ } else {
+ return;
+ }
+ }
+
+ for (PulsarTopicState state : subscribedPartitionStates) {
+ MessageId off = offset.get(state.getTopicRange());
+ if (off != null) {
+ state.setCommittedOffset(off);
+ }
+ }
+ }
+
public PulsarMetadataReader getMetaDataReader() {
return this.metadataReader;
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
similarity index 84%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
index 00b0b53e6..10b44b370 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
@@ -15,17 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
@@ -85,7 +89,10 @@ public class PulsarMetadataReader implements AutoCloseable {
private final SerializableRange range;
+ private PulsarAdmin admin;
+
public PulsarMetadataReader(
+ String adminUrl,
String serverUrl,
ClientConfigurationData clientConf,
String subscriptionName,
@@ -102,6 +109,9 @@ public class PulsarMetadataReader implements AutoCloseable {
this.useExternalSubscription = useExternalSubscription;
this.client = buildPulsarClient(serverUrl, clientConf, caseInsensitiveParams.get(AUTHENTICATION_TOKEN.key()));
this.range = buildRange(caseInsensitiveParams);
+ if (adminUrl != null) {
+ this.admin = PulsarClientUtils.newAdminFromConf(adminUrl, clientConf);
+ }
}
private PulsarClient buildPulsarClient(
@@ -133,21 +143,8 @@ public class PulsarMetadataReader implements AutoCloseable {
return SerializableRange.of(range);
}
- public PulsarMetadataReader(
- String serverUrl,
- ClientConfigurationData clientConf,
- String subscriptionName,
- Map<String, String> caseInsensitiveParams,
- int indexOfThisSubtask,
- int numParallelSubtasks) throws PulsarClientException {
-
- this(serverUrl,
- clientConf,
- subscriptionName,
- caseInsensitiveParams,
- indexOfThisSubtask,
- numParallelSubtasks,
- false);
+ private String subscriptionNameFrom(TopicRange topicRange) {
+ return topicRange.isFullRange() ? subscriptionName : subscriptionName + topicRange.getPulsarRange();
}
@Override
@@ -242,6 +239,27 @@ public class PulsarMetadataReader implements AutoCloseable {
return Collections.emptyList();
}
+ public void commitOffsetToCursor(Map<TopicRange, MessageId> offset) {
+ Preconditions.checkNotNull(admin, "admin url should not be null");
+ for (Map.Entry<TopicRange, MessageId> entry : offset.entrySet()) {
+ TopicRange tp = entry.getKey();
+ try {
+ log.info("Committing offset {} to topic {}", entry.getValue(), tp);
+ admin.topics().resetCursor(tp.getTopic(), subscriptionNameFrom(tp), entry.getValue(), true);
+ log.info("Successfully committed offset {} to topic {}", entry.getValue(), tp);
+ } catch (Throwable e) {
+ if (e instanceof PulsarAdminException &&
+ (((PulsarAdminException) e).getStatusCode() == 404 ||
+ ((PulsarAdminException) e).getStatusCode() == 412)) {
+ log.info("Cannot commit cursor since the topic {} has been deleted during execution", tp);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to commit cursor for %s", tp), e);
+ }
+ }
+ }
+ }
+
/**
* Designate the close of the metadata reader.
*/
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
similarity index 99%
rename from inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
rename to inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
index 0815f5dca..295f4df10 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.pulsar.internal;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.ExceptionProxy;
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index e9465fcb2..d8fcbafb9 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -54,7 +54,8 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.pulsar.withoutadmin.FlinkPulsarSource;
+import org.apache.inlong.sort.pulsar.internal.FlinkPulsarSource;
+import org.apache.inlong.sort.pulsar.internal.FlinkPulsarSourceWithoutAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -273,9 +274,10 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
private SourceFunction<RowData> createPulsarSource(
ClientConfigurationData clientConfigurationData,
PulsarDeserializationSchema<RowData> deserializationSchema) {
- org.apache.inlong.sort.pulsar.FlinkPulsarSource source =
- new org.apache.inlong.sort.pulsar.FlinkPulsarSource(
+ FlinkPulsarSource source =
+ new FlinkPulsarSource(
adminUrl,
+ serviceUrl,
clientConfigurationData,
deserializationSchema,
properties,
@@ -310,7 +312,7 @@ public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadin
private SourceFunction<RowData> createPulsarSourceWithoutAdmin(
ClientConfigurationData clientConfigurationData,
PulsarDeserializationSchema<RowData> deserializationSchema) {
- FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+ FlinkPulsarSourceWithoutAdmin<RowData> source = new FlinkPulsarSourceWithoutAdmin<>(
serviceUrl,
clientConfigurationData,
deserializationSchema,
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
deleted file mode 100644
index e61b7c3f7..000000000
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.pulsar.withoutadmin;
-
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.function.ThrowingConsumer;
-
-/**
- * A collector supporting callback.
- */
-public class CallbackCollector<T> implements Collector<T> {
-
- private final ThrowingConsumer<T, Exception> callback;
-
- public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
- this.callback = callback;
- }
-
- @Override
- public void collect(T t) {
- try {
- callback.accept(t);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
-
- }
-}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 19aafd5fc..69998097e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -475,11 +475,12 @@
inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSink.java
inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
- inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
- inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarFetcher.java
- inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/PulsarMetadataReader.java
- inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/ReaderThread.java
- inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarFetcher.java
+ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java
+ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/ReaderThread.java
+ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+ inlong-sort/sort-connectors/sort-pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSourceWithoutAdmin.java
Source : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.)
License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE