You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/26 13:36:27 UTC
[incubator-inlong] branch master updated: [INLONG-742] Sort Support
Pulsar (#1481)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7dadc59 [INLONG-742] Sort Support Pulsar (#1481)
7dadc59 is described below
commit 7dadc59609d1734e1d0550992f24a1414ddc3b95
Author: Kevin Wen <89...@users.noreply.github.com>
AuthorDate: Thu Aug 26 21:36:21 2021 +0800
[INLONG-742] Sort Support Pulsar (#1481)
---
inlong-sort/pom.xml | 13 +
.../inlong/sort/configuration/Constants.java | 5 +
.../apache/inlong/sort/protocol/DataFlowInfo.java | 46 +-
.../sort/protocol/source/PulsarSourceInfo.java | 19 +
inlong-sort/sort-core/pom.xml | 32 ++
.../org/apache/inlong/sort/flink/SourceEvent.java | 82 +++
.../sort/flink/hive/HiveMultiTenantWriter.java | 3 +-
.../MultiTenantFunctionInitializationContext.java | 24 +-
.../flink/pulsar/MultiTenancyPulsarConsumer.java | 171 ++++++
.../pulsar/MultiTopicPulsarSourceFunction.java | 201 +++++++
.../flink/pulsar/PulsarDeserializationSchema.java | 52 ++
.../inlong/sort/flink/pulsar/PulsarOptions.java | 122 +++++
.../sort/flink/pulsar/PulsarSourceFunction.java | 580 +++++++++++++++++++++
.../inlong/sort/flink/pulsar/PulsarUtils.java | 223 ++++++++
.../SerializedRecordDeserializationSchema.java | 46 ++
.../pulsar/MultiTopicPulsarSourceFunctionTest.java | 236 +++++++++
.../flink/pulsar/PulsarTestMetaManagerUtil.java | 88 ++++
.../sort/flink/pulsar/TestSourceContext.java | 71 +++
.../inlong/sort/util/TestMetaManagerUtil.java | 65 +++
19 files changed, 2071 insertions(+), 8 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 83ae849..f40b41e 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -53,6 +53,7 @@
<snappy.version>1.1.4</snappy.version>
<hadoop.version>2.8.5</hadoop.version>
<hive.version>2.3.8</hive.version>
+ <pulsar.version>2.6.2</pulsar.version>
</properties>
<modules>
@@ -305,6 +306,18 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index aa9391e..029cc77 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -48,6 +48,11 @@ public class Constants {
*/
public static final String DATA_TIME_FIELD = "dt";
+ /**
+ * The prefix of source or sink configuration.
+ */
+ public static final String PULSAR_SOURCE_PREFIX = "pulsar.source.";
+
// ------------------------------------------------------------------------
// Common configs
// ------------------------------------------------------------------------
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java
index 22b1811..923e309 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java
@@ -19,11 +19,16 @@ package org.apache.inlong.sort.protocol;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
-import org.apache.inlong.sort.protocol.source.SourceInfo;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import org.apache.inlong.sort.protocol.source.SourceInfo;
/**
* Data flow protocol.
@@ -38,14 +43,26 @@ public class DataFlowInfo implements Serializable {
private final SinkInfo sinkInfo;
+ @JsonInclude(Include.NON_NULL)
+ private final Map<String, Object> properties;
+
@JsonCreator
public DataFlowInfo(
@JsonProperty("id") long id,
@JsonProperty("source_info") SourceInfo sourceInfo,
- @JsonProperty("sink_info") SinkInfo sinkInfo) {
+ @JsonProperty("sink_info") SinkInfo sinkInfo,
+ @JsonProperty("properties") Map<String, Object> properties) {
this.id = id;
this.sourceInfo = checkNotNull(sourceInfo);
this.sinkInfo = checkNotNull(sinkInfo);
+ this.properties = properties == null ? new HashMap<>() : properties;
+ }
+
+ public DataFlowInfo(long id, SourceInfo sourceInfo, SinkInfo sinkInfo) {
+ this.id = id;
+ this.sourceInfo = sourceInfo;
+ this.sinkInfo = sinkInfo;
+ this.properties = new HashMap<>();
}
@JsonProperty("id")
@@ -62,4 +79,27 @@ public class DataFlowInfo implements Serializable {
public SinkInfo getSinkInfo() {
return sinkInfo;
}
+
+ @JsonProperty("properties")
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DataFlowInfo other = (DataFlowInfo) o;
+
+ return Objects.equals(id, other.id)
+ && Objects.equals(sourceInfo, other.sourceInfo)
+ && Objects.equals(sinkInfo, other.sinkInfo)
+ && Objects.equals(properties, other.properties);
+ }
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java
index c5da028..e669ec0 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.source;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Objects;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -71,4 +72,22 @@ public class PulsarSourceInfo extends SourceInfo {
public String getSubscriptionName() {
return subscriptionName;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PulsarSourceInfo other = (PulsarSourceInfo) o;
+ return super.equals(other)
+ && Objects.equals(adminUrl, other.adminUrl)
+ && Objects.equals(serviceUrl, other.serviceUrl)
+ && Objects.equals(subscriptionName, other.subscriptionName)
+ && Objects.equals(topic, other.topic);
+ }
}
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 216f9b6..1b7cadd 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -163,6 +163,38 @@
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>1.15.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-api</artifactId>
<version>${project.version}</version>
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SourceEvent.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SourceEvent.java
new file mode 100644
index 0000000..37441ce
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SourceEvent.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.apache.inlong.sort.protocol.source.SourceInfo;
+
+public class SourceEvent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SourceEventType sourceEventType;
+
+ private final long dataFlowId;
+
+ private final SourceInfo sourceInfo;
+
+ private final Map<String, Object> properties;
+
+ public SourceEvent(SourceEventType sourceEventType, long dataFlowId, PulsarSourceInfo pulsarSourceInfo,
+ Map<String, Object> properties) {
+ this.sourceEventType = sourceEventType;
+ this.dataFlowId = dataFlowId;
+ this.sourceInfo = checkNotNull(pulsarSourceInfo);
+ this.properties = properties;
+ }
+
+ public SourceEventType getSourceEventType() {
+ return sourceEventType;
+ }
+
+ public SourceInfo getSourceInfo() {
+ return sourceInfo;
+ }
+
+ public long getDataFlowId() {
+ return dataFlowId;
+ }
+
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("sourceEventType", sourceEventType)
+ .append("dataFlowId", dataFlowId)
+ .append("sourceInfo", sourceInfo)
+ .append("properties", properties)
+ .toString();
+ }
+
+ public enum SourceEventType {
+ ADDED,
+ UPDATE,
+ REMOVED
+ }
+}
+
+
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java
index fb71fd9..3d2a39f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java
@@ -171,7 +171,8 @@ public class HiveMultiTenantWriter extends ProcessFunction<SerializedRecord, Par
HiveWriter hiveWriter = new HiveWriter(configuration, dataFlowId, hiveSinkInfo);
hiveWriter.setRuntimeContext(getRuntimeContext());
hiveWriter.initializeState(
- new MultiTenantFunctionInitializationContext(dataFlowId, functionInitializationContext));
+ new MultiTenantFunctionInitializationContext(dataFlowId, functionInitializationContext,
+ getRuntimeContext().getExecutionConfig()));
hiveWriter.open(new org.apache.flink.configuration.Configuration());
hiveWriters.put(dataFlowId, hiveWriter);
recordTransformer.addDataFlow(dataFlowInfo);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java
index 0dc355e..ab7a076 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
@@ -41,9 +42,11 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
private final MultiTenantOperatorStateStore tenantOperatorStateStore;
- public MultiTenantFunctionInitializationContext(long tenantId, FunctionInitializationContext parentContext) {
+ public MultiTenantFunctionInitializationContext(long tenantId, FunctionInitializationContext parentContext,
+ ExecutionConfig executionConfig) {
this.parentContext = checkNotNull(parentContext);
- tenantOperatorStateStore = new MultiTenantOperatorStateStore(tenantId, parentContext.getOperatorStateStore());
+ tenantOperatorStateStore = new MultiTenantOperatorStateStore(tenantId, parentContext.getOperatorStateStore(),
+ executionConfig);
}
@Override
@@ -73,9 +76,13 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
private final OperatorStateStore operatorStateStore;
- public MultiTenantOperatorStateStore(long tenantId, OperatorStateStore operatorStateStore) {
+ private final ExecutionConfig executionConfig;
+
+ public MultiTenantOperatorStateStore(long tenantId, OperatorStateStore operatorStateStore,
+ ExecutionConfig executionConfig) {
this.tenantId = tenantId;
this.operatorStateStore = checkNotNull(operatorStateStore);
+ this.executionConfig = executionConfig;
}
@Override
@@ -86,12 +93,14 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+ serializerInit(listStateDescriptor);
return operatorStateStore
.getListState(new MultiTenantListStateDescriptor<>(tenantId, listStateDescriptor));
}
@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+ serializerInit(listStateDescriptor);
return operatorStateStore
.getUnionListState(new MultiTenantListStateDescriptor<>(tenantId, listStateDescriptor));
}
@@ -111,6 +120,7 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
@Override
public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+ serializerInit(listStateDescriptor);
return operatorStateStore
.getListState(new MultiTenantListStateDescriptor<S>(tenantId, listStateDescriptor));
}
@@ -119,6 +129,12 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
return operatorStateStore.getSerializableListState(getTenantStateName(tenantId, stateName));
}
+
+ private <S> void serializerInit(ListStateDescriptor<S> listStateDescriptor) {
+ if (!listStateDescriptor.isSerializerInitialized()) {
+ listStateDescriptor.initializeSerializerUnlessSet(executionConfig);
+ }
+ }
}
private static class MultiTenantListStateDescriptor<T> extends ListStateDescriptor<T> {
@@ -130,4 +146,4 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
listStateDescriptor.getElementSerializer());
}
}
-}
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTenancyPulsarConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTenancyPulsarConsumer.java
new file mode 100644
index 0000000..499f4d0
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTenancyPulsarConsumer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.inlong.sort.flink.tubemq.MultiTenancyTubeConsumer;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiTenancyPulsarConsumer<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MultiTenancyTubeConsumer.class);
+
+ private final ConcurrentMap<Long, PulsarPullThread> pulsarPullThreads;
+
+ private final Map<Long, PulsarSourceFunction<T>> pulsarSourceFunctions;
+
+ private final Consumer<Throwable> exceptionCatcher;
+
+ private SourceContext<T> context;
+
+ public MultiTenancyPulsarConsumer(Consumer<Throwable> exceptionCatcher) {
+ this.exceptionCatcher = checkNotNull(exceptionCatcher);
+ this.pulsarPullThreads = new ConcurrentHashMap<>();
+ this.pulsarSourceFunctions = new HashMap<>();
+ }
+
+ public void start(SourceContext<T> context) throws Exception {
+ this.context = checkNotNull(context);
+ }
+
+ public void cancel() {
+ for (PulsarPullThread pulsarPullThread : pulsarPullThreads.values()) {
+ pulsarPullThread.cancel();
+ pulsarPullThread.interrupt();
+ }
+ }
+
+ /**
+ * Release pulsarPullThreads.
+ *
+ * @throws Exception
+ */
+ public void close() throws Exception {
+ MetaManager.release();
+ cancel();
+ for (PulsarPullThread pulsarPullThread : pulsarPullThreads.values()) {
+ try {
+ pulsarPullThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Could not cancel thread {}", pulsarPullThread.getName());
+ }
+ }
+ pulsarPullThreads.clear();
+
+ for (PulsarSourceFunction<T> sourceFunction : pulsarSourceFunctions.values()) {
+ try {
+ sourceFunction.close();
+ } catch (Throwable throwable) {
+ LOG.warn("Could not properly shutdown the pulsar pull consumer.", throwable);
+ }
+ }
+ pulsarSourceFunctions.clear();
+ }
+
+ /**
+ * Add a new pulsar source.
+ */
+ public void addPulsarSource(long dataflowId, PulsarSourceFunction<T> pulsarSourceFunction) throws Exception {
+ if (pulsarPullThreads.containsKey(dataflowId)) {
+ LOG.warn("Pull thread of dataflow-id {} has already been started", dataflowId);
+ return;
+ }
+ PulsarPullThread pulsarPullThread = new PulsarPullThread(pulsarSourceFunction);
+ pulsarPullThread.setName("PulsarPullThread-for-dataflowId-" + dataflowId);
+ pulsarPullThread.start();
+ pulsarPullThreads.put(dataflowId, pulsarPullThread);
+ pulsarSourceFunctions.put(dataflowId, pulsarSourceFunction);
+
+ LOG.info("Add pulsar source \"{}\" successfully!", dataflowId);
+ }
+
+ public void updatePulsarSource(long dataflowId, PulsarSourceFunction<T> pulsarSourceFunction) throws Exception {
+ removePulsarSource(dataflowId);
+ addPulsarSource(dataflowId, pulsarSourceFunction);
+ }
+
+ public void removePulsarSource(long dataflowId) {
+ PulsarPullThread pulsarPullThread = pulsarPullThreads.get(dataflowId);
+ if (pulsarPullThread != null) {
+ pulsarPullThread.cancel();
+ pulsarPullThread.interrupt();
+ try {
+ pulsarPullThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Could not cancel thread {}", pulsarPullThread.getName());
+ }
+ pulsarPullThreads.remove(dataflowId);
+ }
+ PulsarSourceFunction<T> sourceFunction = pulsarSourceFunctions.get(dataflowId);
+ if (sourceFunction != null) {
+ try {
+ sourceFunction.close();
+ } catch (Exception e) {
+ LOG.error("Could not properly shutdown the pulsar source function.", e);
+ }
+ pulsarSourceFunctions.remove(dataflowId);
+ }
+ }
+
+ public Collection<PulsarSourceFunction<T>> getCurrentSourceFunction() {
+ return pulsarSourceFunctions.values();
+ }
+
+ public class PulsarPullThread extends Thread {
+
+ private volatile boolean running = true;
+ private final PulsarSourceFunction<T> sourceFunction;
+
+ public PulsarPullThread(PulsarSourceFunction<T> sourceFunction) {
+ this.sourceFunction = checkNotNull(sourceFunction);
+ }
+
+ @Override
+ public void run() {
+ try {
+ sourceFunction.run(context);
+ } catch (InterruptedException e) {
+ // ignore interruption from cancelling
+ if (running) {
+ exceptionCatcher.accept(e);
+ LOG.warn("pulsar pull thread has been interrupted.", e);
+ }
+ } catch (Throwable t) {
+ exceptionCatcher.accept(t);
+ LOG.warn("Error occurred in pulsar pull thread.", t);
+ } finally {
+ LOG.info("pulsar pull thread stops");
+ }
+ }
+
+ public void cancel() {
+ this.running = false;
+ sourceFunction.cancel();
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunction.java
new file mode 100644
index 0000000..4def061
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunction.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.flink.SourceEvent;
+import org.apache.inlong.sort.flink.SourceEvent.SourceEventType;
+import org.apache.inlong.sort.flink.multitenant.MultiTenantFunctionInitializationContext;
+import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiTopicPulsarSourceFunction extends RichParallelSourceFunction<SerializedRecord> implements
+ CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(MultiTopicTubeSourceFunction.class);
+
+ private final Configuration configuration;
+ private volatile boolean running = true;
+ private volatile Throwable throwable;
+
+ private transient MetaManager metaManager;
+ private transient MultiTenancyPulsarConsumer<SerializedRecord> pulsarConsumer;
+ private transient FunctionInitializationContext functionInitializationContext;
+ private transient BlockingQueue<SourceEvent> eventQueue;
+
+ public MultiTopicPulsarSourceFunction(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
+ super.open(parameters);
+ eventQueue = new ArrayBlockingQueue<>(configuration.getInteger(Constants.SOURCE_EVENT_QUEUE_CAPACITY));
+ this.metaManager = MetaManager.getInstance(configuration);
+ this.metaManager.registerDataFlowInfoListener(new PulsarDataFlowInfoListener());
+ this.pulsarConsumer = new MultiTenancyPulsarConsumer<>(throwable -> this.throwable = throwable);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ this.functionInitializationContext = functionInitializationContext;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+
+ Collection<PulsarSourceFunction<SerializedRecord>> currentSourceFunction = pulsarConsumer
+ .getCurrentSourceFunction();
+ for (PulsarSourceFunction<SerializedRecord> writer : currentSourceFunction) {
+ writer.snapshotState(functionSnapshotContext);
+ }
+ }
+
+ @Override
+ public void run(SourceContext<SerializedRecord> sourceContext) throws Exception {
+ pulsarConsumer.start(sourceContext);
+ while (running) {
+ if (throwable != null) {
+ throw new RuntimeException(throwable);
+ }
+ SourceEvent sourceEvent = eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+ if (sourceEvent != null) {
+ processEvent(sourceEvent);
+ }
+ if (!running) {
+ return;
+ }
+ }
+ }
+
+ private void processEvent(SourceEvent sourceEvent) throws Exception {
+ SourceEventType sourceEventType = sourceEvent.getSourceEventType();
+ PulsarSourceInfo pulsarSourceInfo = (PulsarSourceInfo) sourceEvent.getSourceInfo();
+ Map<String, Object> properties = sourceEvent.getProperties();
+ long dataFlowId = sourceEvent.getDataFlowId();
+
+ switch (sourceEventType) {
+ case ADDED:
+ PulsarSourceFunction<SerializedRecord> pulsarSourceFunction = generateSourceFunction(dataFlowId,
+ properties, pulsarSourceInfo);
+ pulsarConsumer.addPulsarSource(dataFlowId, pulsarSourceFunction);
+ break;
+ case UPDATE:
+ PulsarSourceFunction<SerializedRecord> updateSourceFunction = generateSourceFunction(dataFlowId,
+ properties, pulsarSourceInfo);
+ pulsarConsumer.updatePulsarSource(dataFlowId, updateSourceFunction);
+ break;
+ case REMOVED:
+ pulsarConsumer.removePulsarSource(dataFlowId);
+ break;
+ default:
+ LOG.error("Unknown source event type {}", sourceEvent.getSourceEventType());
+ throw new RuntimeException("Unknown source event type " + sourceEvent.getSourceEventType());
+ }
+ }
+
+ public PulsarSourceFunction<SerializedRecord> generateSourceFunction(
+ long dataFlowId,
+ Map<String, Object> properties,
+ PulsarSourceInfo pulsarSourceInfo) throws Exception {
+ org.apache.flink.configuration.Configuration config =
+ new org.apache.flink.configuration.Configuration();
+ putMapToConfig(config, properties);
+ PulsarSourceFunction<SerializedRecord> pulsarSourceFunction = new PulsarSourceFunction<>(
+ pulsarSourceInfo.getAdminUrl(),
+ pulsarSourceInfo.getServiceUrl(),
+ pulsarSourceInfo.getTopic(),
+ pulsarSourceInfo.getSubscriptionName(),
+ new SerializedRecordDeserializationSchema(dataFlowId),
+ config);
+ pulsarSourceFunction.setRuntimeContext(getRuntimeContext());
+ pulsarSourceFunction.initializeState(
+ new MultiTenantFunctionInitializationContext(dataFlowId, functionInitializationContext,
+ getRuntimeContext().getExecutionConfig()
+ ));
+ pulsarSourceFunction.open(config);
+ return pulsarSourceFunction;
+
+ }
+
+ public void putMapToConfig(org.apache.flink.configuration.Configuration config, Map<String, Object> properties) {
+ for (Map.Entry<String, Object> entry : properties.entrySet()) {
+ if (entry.getKey().startsWith(Constants.PULSAR_SOURCE_PREFIX)) {
+ config.setString(entry.getKey().replaceFirst(Constants.PULSAR_SOURCE_PREFIX, ""),
+ entry.getValue().toString());
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.running = false;
+ pulsarConsumer.cancel();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ pulsarConsumer.close();
+ MetaManager.release();
+ }
+
+ private class PulsarDataFlowInfoListener implements MetaManager.DataFlowInfoListener {
+
+ public void queueEvent(SourceEventType eventType, DataFlowInfo dataFlowInfo) throws InterruptedException {
+ if (dataFlowInfo.getSourceInfo() instanceof PulsarSourceInfo) {
+ eventQueue.put(new SourceEvent(eventType, dataFlowInfo.getId(),
+ (PulsarSourceInfo) dataFlowInfo.getSourceInfo(), dataFlowInfo.getProperties()));
+ } else {
+ LOG.warn("Received a non-pulsar data flow notification {}: {}", eventType, dataFlowInfo);
+ }
+ }
+
+ @Override
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ queueEvent(SourceEventType.ADDED, dataFlowInfo);
+ }
+
+
+ @Override
+ public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ queueEvent(SourceEventType.UPDATE, dataFlowInfo);
+ }
+
+ @Override
+ public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ queueEvent(SourceEventType.REMOVED, dataFlowInfo);
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarDeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarDeserializationSchema.java
new file mode 100644
index 0000000..5d03987
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.pulsar.client.api.Message;
+
+public interface PulsarDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+ class DeserializationResult<T> {
+ private final T record;
+ // the reason of including data length here is to reduce overhead of getting data from Message several times
+ private final long dataLength;
+
+ private DeserializationResult(T record, long length) {
+ this.record = record;
+ this.dataLength = length;
+ }
+
+ public T getRecord() {
+ return record;
+ }
+
+ public long getDataLength() {
+ return dataLength;
+ }
+
+ public static <T> DeserializationResult<T> of(T record, long dataLength) {
+ return new DeserializationResult<>(record, dataLength);
+ }
+ }
+
+ DeserializationResult<T> deserialize(@SuppressWarnings("rawtypes") Message message) throws IOException;
+
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarOptions.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarOptions.java
new file mode 100644
index 0000000..a209820
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarOptions.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for pulsar sources and sink.
+ */
+public class PulsarOptions {
+
+ public static final String BOOTSTRAP_MODE_LATEST = "latest";
+ public static final String BOOTSTRAP_MODE_EARLIEST = "earliest";
+
+ public static final ConfigOption<String> OPERATION_TIMEOUT =
+ ConfigOptions.key("operation-timeout")
+ .defaultValue("30s")
+ .withDescription("Duration of waiting for completing an operation.");
+
+ public static final ConfigOption<String> CONNECTION_TIMEOUT =
+ ConfigOptions.key("connection-timeout")
+ .defaultValue("10s")
+ .withDescription("Duration of waiting for a connection to a broker to be established.");
+
+ public static final ConfigOption<String> REQUEST_TIMEOUT =
+ ConfigOptions.key("request-timeout")
+ .defaultValue("60s")
+ .withDescription("Duration of waiting for completing a request.");
+
+ public static final ConfigOption<String> KEEPALIVE_INTERVAL =
+ ConfigOptions.key("keepalive-interval")
+ .defaultValue("30s")
+ .withDescription("Duration of keeping alive interval for each client broker connection.");
+
+ public static final ConfigOption<String> CONSUMER_BOOTSTRAP_MODE =
+ ConfigOptions.key("consumer.bootstrap-mode")
+ .defaultValue(BOOTSTRAP_MODE_LATEST)
+ .withDescription("The behavior when the consumer bootstraps. This only takes effect when not "
+ + "restoring from checkpoints.");
+
+ public static final ConfigOption<Integer> CONSUMER_RECEIVE_QUEUE_SIZE =
+ ConfigOptions.key("consumer.receive-queue-size")
+ .defaultValue(1000)
+ .withDescription("The size of a consumer's receiver queue.");
+
+ public static final ConfigOption<String> CONSUMER_RECEIVE_TIMEOUT =
+ ConfigOptions.key("consumer.receive-timeout")
+ .defaultValue("120s")
+ .withDescription("The timeout in each receiving from pulsar.");
+
+ public static final ConfigOption<String> CONSUMER_CHECK_PARTITION_INTERVAL =
+ ConfigOptions.key("consumer.check-partition-interval")
+ .defaultValue("5s")
+ .withDescription("Duration of checking interval for each partition reader.");
+
+ public static final ConfigOption<String> CONSUMER_MAX_IDLE_TIME =
+ ConfigOptions.key("consumer.max-idle-time")
+ .defaultValue("60s")
+ .withDescription("The max idle time for the pulsar consumer.");
+
+ public static final ConfigOption<Boolean> CONSUMER_ENABLE_AUTO_COMMIT =
+ ConfigOptions.key("consumer.enable-auto-commit")
+ .defaultValue(true)
+ .withDescription("True if the consumer is enabled to automatically commit offsets.");
+
+ public static final ConfigOption<String> CONSUMER_AUTO_COMMIT_INTERVAL =
+ ConfigOptions.key("consumer.auto-commit-interval")
+ .defaultValue("120s")
+ .withDescription("The interval for consumers to commit offset if automatically commit is enabled.");
+
+ public static final ConfigOption<String> PRODUCER_ROUTE_MODE =
+ ConfigOptions.key("producer.route-mode")
+ .defaultValue("RoundRobinPartition")
+ .withDescription("Message routing logic for producers on partitioned topics.");
+
+ public static final ConfigOption<Integer> PRODUCER_PENDING_QUEUE_SIZE =
+ ConfigOptions.key("producer.pending-queue-size")
+ .defaultValue(1000)
+ .withDescription("The maximum size of a queue holding pending messages.");
+
+ public static final ConfigOption<Integer> PRODUCER_PENDING_SIZE =
+ ConfigOptions.key("producer.pending-total-size")
+ .defaultValue(50000)
+ .withDescription("The maximum number of pending messages across partitions.");
+
+ public static final ConfigOption<Boolean> PRODUCER_BLOCK_QUEUE_FULL =
+ ConfigOptions.key("producer.block-if-queue-full")
+ .defaultValue(true)
+ .withDescription("When the queue is full, the method is blocked "
+ + "instead of an exception is thrown.");
+
+ public static final ConfigOption<String> PRODUCER_SEND_TIMEOUT =
+ ConfigOptions.key("producer.send-timeout")
+ .defaultValue("30s")
+ .withDescription("The timeout in each sending to pulsar.");
+
+ public static final ConfigOption<Boolean> SOURCE_DISABLE_CHAINING =
+ ConfigOptions.key("source.disable-chaining")
+ .defaultValue(false)
+ .withDescription("The source operator will not chain with downstream if true.");
+
+ public static final ConfigOption<Boolean> SINK_START_NEW_CHAIN =
+ ConfigOptions.key("sink.start-new-chain")
+ .defaultValue(true)
+ .withDescription("The sink operator will start a new chain if true.");
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarSourceFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarSourceFunction.java
new file mode 100644
index 0000000..daff1d1
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarSourceFunction.java
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.flink.pulsar.PulsarDeserializationSchema.DeserializationResult;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pulsar source (consumer) which receives messages from topics.
+ *
+ * <p>
+ * Copied from Flink-connector with a bit of change. We need whole Message of Pulsar to deserializing. So we fork this
+ * pulsar source function until our requirements have been satisfied in Flink-connector.
+ * </p>
+ */
+public class PulsarSourceFunction<T>
+ extends RichParallelSourceFunction<T>
+ implements ResultTypeQueryable<T>, CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PulsarSourceFunction.class);
+
+ private static final String OFFSETS_STATE_NAME = "pulsar-offset-state";
+ private static final String READER_THREAD_NAME_PREFIX = "pulsar-reader-";
+ private static final String COMMITTER_THREAD_NAME = "pulsar-committer";
+ private static final String SUBSCRIPTION_NAME_OPTION_KEY = "subscriptionName";
+
+ /**
+ * The pulsar admin console address.
+ */
+ private final String adminUrl;
+
+ /**
+ * The pulsar service provider address.
+ */
+ private final String serviceUrl;
+
+ /**
+ * The pulsar topic name.
+ */
+ private final String topic;
+
+ /**
+ * The pulsar consumerGroup if assigned.
+ */
+ private final String consumerGroup;
+
+ /**
+ * The configuration for the pulsar consumer.
+ */
+ private final Configuration configuration;
+
+ /**
+ * The deserializer for records.
+ */
+ private final PulsarDeserializationSchema<T> deserializationSchema;
+
+ /**
+ * The pulsar client.
+ */
+ private transient PulsarClient client;
+
+ /**
+ * The pulsar admin.
+ */
+ private transient PulsarAdmin admin;
+
+ /**
+ * The readers for assigned partitions.
+ *
+ * <p>NOTE: The map can only be accessed in the task thread.</p>
+ */
+ private transient Map<String, PulsarPartitionReader> partitionReaders;
+
+ /**
+ * Accessor for state in the operator state backend.
+ */
+ private transient ListState<Tuple2<String, MessageId>> unionOffsetState;
+
+ /**
+ * The restored offsets of queues which are restored from flink state.
+ *
+ * <p>NOTE: The map can only be accessed in the task thread.</p>
+ */
+ private transient Map<String, MessageId> restoredOffsets;
+
+ /**
+ * The current offsets of queues which are stored in flink state
+ * once a checkpoint is triggered.
+ *
+ * <p>NOTE: The map should be guarded by the checkpoint lock.</p>
+ */
+ private transient Map<String, MessageId> currentOffsets;
+
+ /**
+ * Flag indicating whether the consumer is still running.
+ **/
+ private transient volatile boolean isRunning;
+
+ /**
+ * The timeout for the reader's pulling.
+ */
+ private transient Duration receiveTimeout;
+
+ /**
+ * The last committed offsets for topic partition.
+ */
+ private transient Map<String, MessageId> lastCommittedOffsets;
+
+ /**
+ * The scheduler for auto commit tasks.
+ */
+ private transient ScheduledExecutorService executor;
+
+ public PulsarSourceFunction(
+ String adminUrl,
+ String serviceUrl,
+ String topic,
+ String consumerGroup,
+ PulsarDeserializationSchema<T> deserializationSchema,
+ Configuration configuration
+ ) {
+ Preconditions.checkNotNull(topic,
+ "The topic must not be null.");
+ Preconditions.checkNotNull(consumerGroup,
+ "The consumer group must not be null.");
+ Preconditions.checkNotNull(serviceUrl,
+ "The service url must not be null.");
+ Preconditions.checkNotNull(adminUrl,
+ "The admin url must not be null.");
+ Preconditions.checkNotNull(deserializationSchema,
+ "The deserialization schema must not be null.");
+ Preconditions.checkNotNull(configuration,
+ "The configuration must not be null.");
+
+ this.adminUrl = adminUrl;
+ this.serviceUrl = serviceUrl;
+ this.topic = topic;
+ this.consumerGroup = consumerGroup;
+ this.configuration = configuration;
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ super.open(parameters);
+
+ LOG.info("Opening the pulsar source: adminUrl: {}, serviceUrl: {}, topic: {}, consumer group: {}.",
+ adminUrl, serviceUrl, topic, consumerGroup);
+
+ LOG.info("Pulsar source configuration: {}", configuration);
+
+ admin = PulsarUtils.createAdmin(adminUrl);
+ client = PulsarUtils.createClient(serviceUrl, configuration);
+ partitionReaders = new HashMap<>();
+ currentOffsets = new HashMap<>();
+ lastCommittedOffsets = new HashMap<>();
+
+ receiveTimeout =
+ parseDuration(configuration.getString(
+ PulsarOptions.CONSUMER_RECEIVE_TIMEOUT));
+
+ isRunning = true;
+ }
+
+ @Override
+ public void run(SourceContext<T> context) throws Exception {
+
+ int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+ boolean isAutoCommitEnabled =
+ configuration.getBoolean(PulsarOptions.CONSUMER_ENABLE_AUTO_COMMIT);
+ if (isAutoCommitEnabled) {
+ Duration autoCommitInterval =
+ parseDuration(configuration.getString(
+ PulsarOptions.CONSUMER_AUTO_COMMIT_INTERVAL));
+
+ ThreadFactory threadFactory =
+ new ExecutorThreadFactory(COMMITTER_THREAD_NAME);
+ executor =
+ Executors.newScheduledThreadPool(1, threadFactory);
+ executor.scheduleWithFixedDelay(
+ new AutoCommitTask(context),
+ autoCommitInterval.toMillis(),
+ autoCommitInterval.toMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ Duration checkPartitionInterval =
+ parseDuration(configuration.getString(
+ PulsarOptions.CONSUMER_CHECK_PARTITION_INTERVAL));
+ Duration maxIdleTime =
+ parseDuration(configuration.getString(
+ PulsarOptions.CONSUMER_MAX_IDLE_TIME));
+
+ Instant lastEmitInstant = Instant.MIN;
+
+ // set subscriptionName
+ HashMap<String, Object> readerConf = new HashMap<>();
+ readerConf.putIfAbsent(SUBSCRIPTION_NAME_OPTION_KEY, consumerGroup);
+
+ while (isRunning) {
+
+ // Discover new partitions
+ Set<String> topicPartitions = PulsarUtils.getTopicPartitions(admin, topic);
+
+ Set<String> assignedTopicPartitions =
+ topicPartitions.stream()
+ .filter(p -> (((((long) p.hashCode()) & 0xffffffffL) % numTasks) == taskIndex))
+ .collect(Collectors.toCollection(TreeSet::new));
+
+ for (String topicPartition : assignedTopicPartitions) {
+ if (!partitionReaders.containsKey(topicPartition)) {
+
+ MessageId bootstrapOffset =
+ getBootstrapOffset(topicPartition);
+
+ LOG.info("Discover a new topic partition {} starting from offset {}.", topicPartition,
+ bootstrapOffset);
+
+ Reader<byte[]> reader =
+ PulsarUtils.createReader(
+ client,
+ topicPartition,
+ bootstrapOffset,
+ configuration,
+ readerConf
+ );
+
+ PulsarPartitionReader partitionReader =
+ new PulsarPartitionReader(
+ context,
+ topicPartition,
+ reader
+ );
+ partitionReaders.put(topicPartition, partitionReader);
+
+ partitionReader.start();
+ }
+ }
+
+ // Check the status of each partition reader
+ for (PulsarPartitionReader partitionReader :
+ partitionReaders.values()) {
+
+ Throwable partitionThrowable = partitionReader.getThrowable();
+ if (partitionThrowable != null) {
+ throw new IOException("Could not properly read messages from topic partition "
+ + partitionReader.getTopicPartition() + ".", partitionThrowable);
+ }
+
+ Instant partitionLastEmitInstant =
+ partitionReader.getLastEmitInstant();
+ if (partitionLastEmitInstant.compareTo(lastEmitInstant) > 0) {
+ lastEmitInstant = partitionLastEmitInstant;
+ }
+ }
+
+ // Mark the consumer as idle if the idle time exceeds the limit.
+ Duration idleTime =
+ Duration.between(lastEmitInstant, Instant.now());
+ if (idleTime.compareTo(maxIdleTime) > 0) {
+ context.markAsTemporarilyIdle();
+ }
+
+ //noinspection BusyWait
+ Thread.sleep(checkPartitionInterval.toMillis());
+ }
+
+ // Waits until all readers exit
+ for (
+ PulsarPartitionReader partitionReader :
+ partitionReaders.values()
+ ) {
+ try {
+ partitionReader.join();
+ partitionReader.close();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly close the reader for topic partition {}.",
+ partitionReader.getTopicPartition(), t);
+ }
+ }
+ }
+
+ @Override
+ public void initializeState(
+ FunctionInitializationContext context
+ ) throws Exception {
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ unionOffsetState = stateStore.getUnionListState(
+ new ListStateDescriptor<>(
+ OFFSETS_STATE_NAME,
+ TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
+ })
+ )
+ );
+
+ restoredOffsets = new TreeMap<>();
+ if (context.isRestored()) {
+ for (Tuple2<String, MessageId> restoredOffset :
+ unionOffsetState.get()) {
+ String topicPartition = restoredOffset.f0;
+ MessageId offset = restoredOffset.f1;
+
+ restoredOffsets.put(topicPartition, offset);
+ }
+
+ LOG.info("The pulsar source successfully restores the offsets ({}).", restoredOffsets);
+ } else {
+ LOG.info("The pulsar source does not restore from any checkpoint.");
+ }
+ }
+
+ @Override
+ public void snapshotState(
+ FunctionSnapshotContext context
+ ) throws Exception {
+
+ unionOffsetState.clear();
+ for (Map.Entry<String, MessageId> entry : currentOffsets.entrySet()) {
+ unionOffsetState.add(Tuple2.of(entry.getKey(), entry.getValue()));
+ }
+
+ LOG.info("Successfully save the offsets {} in checkpoint {}.",
+ currentOffsets, context.getCheckpointId());
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ cancel();
+
+ if (executor != null) {
+ try {
+ executor.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly shutdown the executor.", t);
+ }
+ }
+
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly close the pulsar client.", t);
+ }
+ }
+
+ if (admin != null) {
+ try {
+ admin.close();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly close the pulsar admin.", t);
+ }
+ }
+
+ super.close();
+ }
+
+ private MessageId getBootstrapOffset(
+ String topicPartition
+ ) throws PulsarAdminException {
+ MessageId restoredOffset = restoredOffsets.get(topicPartition);
+ if (restoredOffset != null) {
+ return restoredOffset;
+ }
+
+ String bootstrapMode = configuration.getString(
+ PulsarOptions.CONSUMER_BOOTSTRAP_MODE);
+
+ switch (bootstrapMode) {
+ case PulsarOptions.BOOTSTRAP_MODE_EARLIEST:
+ return MessageId.earliest;
+ case PulsarOptions.BOOTSTRAP_MODE_LATEST:
+ return MessageId.latest;
+ default:
+ return PulsarUtils.getTopicPartitionOffset(admin, topicPartition, consumerGroup);
+ }
+ }
+
+ private class PulsarPartitionReader extends Thread implements AutoCloseable {
+
+ private final SourceContext<T> context;
+
+ private final String topicPartition;
+
+ private final Reader<byte[]> reader;
+
+ private volatile Throwable throwable;
+
+ private volatile Instant lastEmitInstant;
+
+ PulsarPartitionReader(
+ SourceContext<T> context,
+ String topicPartition,
+ Reader<byte[]> reader
+ ) {
+ super(READER_THREAD_NAME_PREFIX + topicPartition);
+
+ this.context = context;
+ this.topicPartition = topicPartition;
+ this.reader = reader;
+ this.throwable = null;
+ this.lastEmitInstant = Instant.MIN;
+ }
+
+ String getTopicPartition() {
+ return topicPartition;
+ }
+
+ Throwable getThrowable() {
+ return throwable;
+ }
+
+ Instant getLastEmitInstant() {
+ return lastEmitInstant;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("The reader for {} starts.", topicPartition);
+
+ while (isRunning) {
+ //noinspection rawtypes
+ Message message =
+ reader.readNext(
+ (int) receiveTimeout.toMillis(),
+ TimeUnit.MILLISECONDS
+ );
+
+ if (message != null) {
+ String topicPartition = message.getTopicName();
+ MessageId offset = message.getMessageId();
+
+ DeserializationResult<T> result = deserializationSchema.deserialize(message);
+
+ synchronized (context.getCheckpointLock()) {
+ context.collect(result.getRecord());
+ currentOffsets.put(topicPartition, offset);
+ }
+
+ lastEmitInstant = Instant.now();
+ } else {
+ LOG.debug("The reader read is null, maybe it reached the end of the topic:{}", topicPartition);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Error occurred of partition reader", t);
+ throwable = t;
+ } finally {
+ LOG.info("The reader for {} exits.", topicPartition);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ reader.close();
+ }
+ }
+
+ private class AutoCommitTask implements Runnable {
+
+ private final SourceContext<T> context;
+
+ private AutoCommitTask(SourceContext<T> context) {
+ this.context = context;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ if (!isRunning) {
+ return;
+ }
+
+ Map<String, MessageId> currentOffsetsCopy;
+ synchronized (context.getCheckpointLock()) {
+ currentOffsetsCopy = new HashMap<>(currentOffsets);
+ }
+
+ for (
+ Map.Entry<String, MessageId> entry :
+ currentOffsetsCopy.entrySet()
+ ) {
+ String topicPartition = entry.getKey();
+ MessageId currentOffset =
+ currentOffsetsCopy.get(topicPartition);
+
+ MessageId lastCommittedOffset =
+ lastCommittedOffsets.get(topicPartition);
+
+ // Skips the committing if the offset is not changed.
+ if (Objects.equals(currentOffset, lastCommittedOffset)) {
+ continue;
+ }
+
+ PulsarUtils.commitTopicPartitionOffset(
+ admin,
+ topicPartition,
+ currentOffset,
+ consumerGroup
+ );
+
+ lastCommittedOffsets.put(topicPartition, currentOffset);
+ }
+ } catch (Throwable throwable) {
+ LOG.warn("Could not properly commit the offset.", throwable);
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarUtils.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarUtils.java
new file mode 100644
index 0000000..a7e453a
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarUtils.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.CONNECTION_TIMEOUT;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.CONSUMER_RECEIVE_QUEUE_SIZE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.KEEPALIVE_INTERVAL;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.OPERATION_TIMEOUT;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_BLOCK_QUEUE_FULL;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_PENDING_QUEUE_SIZE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_PENDING_SIZE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_ROUTE_MODE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_SEND_TIMEOUT;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.REQUEST_TIMEOUT;
+import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+import static org.apache.pulsar.common.naming.TopicName.getPartitionIndex;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The utility class for pulsar.
+ */
+public class PulsarUtils {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PulsarUtils.class);
+
+ public static PulsarClient createClient(
+ String serviceUrl,
+ Configuration configuration
+ ) throws PulsarClientException {
+
+ Duration operationTimeout =
+ parseDuration(configuration.getString(OPERATION_TIMEOUT));
+ Duration connectionTimeout =
+ parseDuration(configuration.getString(CONNECTION_TIMEOUT));
+ Duration requestTimeout =
+ parseDuration(configuration.getString(REQUEST_TIMEOUT));
+ Duration keepAliveInterval =
+ parseDuration(configuration.getString(KEEPALIVE_INTERVAL));
+
+ ClientConfigurationData clientConfigurationData =
+ new ClientConfigurationData();
+ clientConfigurationData
+ .setServiceUrl(serviceUrl);
+ clientConfigurationData
+ .setOperationTimeoutMs(operationTimeout.toMillis());
+ clientConfigurationData
+ .setConnectionTimeoutMs((int) connectionTimeout.toMillis());
+ clientConfigurationData
+ .setRequestTimeoutMs((int) requestTimeout.toMillis());
+ clientConfigurationData
+ .setKeepAliveIntervalSeconds((int) keepAliveInterval.getSeconds());
+
+ return new PulsarClientImpl(clientConfigurationData);
+ }
+
+ public static Reader<byte[]> createReader(
+ PulsarClient client,
+ String topicPartition,
+ MessageId bootstrapOffset,
+ Configuration configuration,
+ Map<String, Object> readerConf
+ ) throws PulsarClientException {
+
+ int receiveQueueSize =
+ configuration.getInteger(CONSUMER_RECEIVE_QUEUE_SIZE);
+
+ return client
+ .newReader()
+ .topic(topicPartition)
+ .startMessageId(bootstrapOffset)
+ .receiverQueueSize(receiveQueueSize)
+ .loadConf(readerConf)
+ .create();
+ }
+
+ public static Producer<byte[]> createProducer(
+ PulsarClient client,
+ String topic,
+ Configuration configuration
+ ) throws PulsarClientException {
+ MessageRoutingMode routeMode = Enum.valueOf(MessageRoutingMode.class,
+ PRODUCER_ROUTE_MODE.defaultValue());
+ Duration sendTimeout =
+ parseDuration(configuration.getString(PRODUCER_SEND_TIMEOUT));
+ int pendingQueueSize =
+ configuration.getInteger(PRODUCER_PENDING_QUEUE_SIZE);
+ int pendingSize =
+ configuration.getInteger(PRODUCER_PENDING_SIZE);
+ boolean blockIfQueueFull =
+ configuration.getBoolean(PRODUCER_BLOCK_QUEUE_FULL);
+
+ return client
+ .newProducer()
+ .topic(topic)
+ .messageRoutingMode(routeMode)
+ .sendTimeout((int) sendTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ .maxPendingMessages(pendingQueueSize)
+ .maxPendingMessagesAcrossPartitions(pendingSize)
+ .blockIfQueueFull(blockIfQueueFull)
+ .create();
+ }
+
+ public static PulsarAdmin createAdmin(
+ String adminUrl
+ ) throws PulsarClientException {
+ return new PulsarAdmin(adminUrl, new ClientConfigurationData());
+ }
+
+ public static Set<String> getTopicPartitions(
+ PulsarAdmin admin,
+ String topic
+ ) throws PulsarAdminException {
+ Set<String> topicPartitions = new TreeSet<>();
+
+ int numTopicPartitions =
+ admin.topics().getPartitionedTopicMetadata(topic).partitions;
+ if (numTopicPartitions == 0) {
+ topicPartitions.add(topic);
+ } else {
+ for (int i = 0; i < numTopicPartitions; ++i) {
+ topicPartitions.add(topic + PARTITIONED_TOPIC_SUFFIX + i);
+ }
+ }
+
+ return topicPartitions;
+ }
+
+ public static MessageId getTopicPartitionOffset(
+ PulsarAdmin admin,
+ String topicPartition,
+ String consumerGroup
+ ) throws PulsarAdminException {
+ TopicStats topicStats = admin.topics().getStats(topicPartition);
+ if (topicStats.subscriptions.containsKey(consumerGroup)) {
+ SubscriptionStats subStats =
+ topicStats.subscriptions.get(consumerGroup);
+ if (subStats.consumers.size() != 0) {
+ throw new RuntimeException("Subscription been actively used by other consumers in this situation, the "
+ + "exactly-once semantics cannot be guaranteed.");
+ } else {
+ PersistentTopicInternalStats.CursorStats cursorStats =
+ admin
+ .topics()
+ .getInternalStats(topicPartition)
+ .cursors
+ .get(consumerGroup);
+
+ String[] ids =
+ cursorStats.markDeletePosition.split(":", 2);
+ long ledgerId = Long.parseLong(ids[0]);
+ long entryId = Long.parseLong(ids[1]);
+ int partitionIndex = getPartitionIndex(topicPartition);
+
+ return new MessageIdImpl(
+ ledgerId,
+ entryId + 1,
+ partitionIndex
+ );
+ }
+ } else {
+ throw new RuntimeException("Could not find consumer group '" + consumerGroup + "' for topic partition "
+ + topicPartition + ".");
+ }
+ }
+
+ public static void commitTopicPartitionOffset(
+ PulsarAdmin admin,
+ String topicPartition,
+ MessageId offset,
+ String consumerGroup
+ ) {
+ try {
+ admin.topics().resetCursor(topicPartition, consumerGroup, offset);
+ } catch (Throwable e) {
+ if (e instanceof PulsarAdminException
+ && (((PulsarAdminException) e).getStatusCode() == 404
+ || ((PulsarAdminException) e).getStatusCode() == 412)) {
+ LOG.info("Cannot commit cursor since the topic {} has been deleted during execution", topicPartition);
+ } else {
+ throw new RuntimeException(String.format("Failed to commit cursor for %s", topicPartition), e);
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/SerializedRecordDeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/SerializedRecordDeserializationSchema.java
new file mode 100644
index 0000000..f1db602
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/SerializedRecordDeserializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.pulsar.client.api.Message;
+
+public class SerializedRecordDeserializationSchema implements PulsarDeserializationSchema<SerializedRecord> {
+
+ private static final long serialVersionUID = -374382043299290093L;
+
+ private final long dataFlowId;
+
+ public SerializedRecordDeserializationSchema(long dataFlowId) {
+ this.dataFlowId = dataFlowId;
+ }
+
+ @Override
+ public DeserializationResult<SerializedRecord> deserialize(@SuppressWarnings("rawtypes") Message message)
+ throws IOException {
+ final byte[] data = message.getData();
+ return DeserializationResult.of(new SerializedRecord(dataFlowId, message.getEventTime(), data), data.length);
+ }
+
+ @Override
+ public TypeInformation<SerializedRecord> getProducedType() {
+ return TypeInformation.of(SerializedRecord.class);
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunctionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunctionTest.java
new file mode 100644
index 0000000..c1cfb96
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunctionTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.util.CommonUtils;
+import org.apache.inlong.sort.util.TestMetaManagerUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Ignore
+public class MultiTopicPulsarSourceFunctionTest {
+
+ public static final String TEST_TOPIC = "test_topic";
+ public static final String CONSUMER_GROUP = "test";
+ private static final int TOTAL_COUNT = 5;
+ private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.2.0");
+ private PulsarContainer pulsar;
+ private TestMetaManagerUtil testMetaManagerUtil;
+ private StreamExecutionEnvironment env;
+
+ @Before
+ public void startPulsarContainer() throws Exception {
+ pulsar = new PulsarContainer(PULSAR_IMAGE);
+ pulsar.start();
+ producerMessageToPulsar(pulsar.getPulsarBrokerUrl());
+ testMetaManagerUtil = new PulsarTestMetaManagerUtil();
+ testMetaManagerUtil
+ .initDataFlowInfo(pulsar.getHttpServiceUrl(), pulsar.getPulsarBrokerUrl(), TEST_TOPIC, CONSUMER_GROUP);
+ }
+
+ @Test
+ public void testPulsarSourceFunction() throws Exception {
+ MultiTopicPulsarSourceFunction source = new MultiTopicPulsarSourceFunction(
+ testMetaManagerUtil.getConfig());
+ TestSourceContext<SerializedRecord> sourceContext = new TestSourceContext<>();
+ source.setRuntimeContext(
+ new MockStreamingRuntimeContext(false, 1, 0));
+
+ source.initializeState(
+ new MockFunctionInitializationContext(
+ false,
+ new MockOperatorStateStore(null, null)));
+ source.open(new Configuration());
+ final CheckedThread runThread =
+ new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ source.run(sourceContext);
+ }
+ };
+ runThread.start();
+ List<SerializedRecord> records = drain(sourceContext, 5);
+ assertEquals(5, records.size());
+ source.cancel();
+ source.close();
+ runThread.stop();
+ }
+
+ protected void producerMessageToPulsar(String pulsarBrokerUrl) throws Exception {
+ RowSerializer rowSerializer = CommonUtils.generateRowSerializer(new RowFormatInfo(
+ new String[] {"f1", "f2"},
+ new FormatInfo[] {StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE}));
+ DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(1024);
+
+ try (
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarBrokerUrl)
+ .build();
+ Producer<byte[]> producer = client.newProducer()
+ .topic(TEST_TOPIC)
+ .create()
+ ) {
+ for (int cnt = 0; cnt < TOTAL_COUNT; cnt++) {
+ Row row = Row.of(String.valueOf(cnt), String.valueOf(cnt));
+ rowSerializer.serialize(row, dataOutputSerializer);
+ ByteArrayOutputStream byt = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(byt);
+ objectOutputStream.writeObject(new SerializedRecord(1L, 0, dataOutputSerializer.getCopyOfBuffer()));
+ producer.send(byt.toByteArray());
+ }
+ }
+ }
+
+ @After
+ public void stopPulsarContainer() {
+ if (pulsar != null) {
+ pulsar.close();
+ }
+ }
+
+ private static <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount)
+ throws Exception {
+ List<T> allRecords = new ArrayList<>();
+ LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
+ while (allRecords.size() < expectedRecordCount) {
+ StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
+ if (record != null) {
+ allRecords.add(record.getValue());
+ } else {
+ throw new RuntimeException(
+ "Can't receive " + expectedRecordCount + " elements before timeout.");
+ }
+ }
+
+ return allRecords;
+ }
+
+ private static class MockOperatorStateStore implements OperatorStateStore {
+
+ private final ListState<?> restoredOffsetListState;
+ private final ListState<?> restoredHistoryListState;
+
+ private MockOperatorStateStore(
+ ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
+ this.restoredOffsetListState = restoredOffsetListState;
+ this.restoredHistoryListState = restoredHistoryListState;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public <K, V> BroadcastState<K, V> getBroadcastState(
+ MapStateDescriptor<K, V> stateDescriptor) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)
+ throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<String> getRegisteredStateNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<String> getRegisteredBroadcastStateNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+ return null;
+ }
+
+ @Override
+ public <T extends Serializable> ListState<T> getSerializableListState(String s) throws Exception {
+ return null;
+ }
+ }
+
+ private static class MockFunctionInitializationContext
+ implements FunctionInitializationContext {
+
+ private final boolean isRestored;
+ private final OperatorStateStore operatorStateStore;
+
+ private MockFunctionInitializationContext(
+ boolean isRestored, OperatorStateStore operatorStateStore) {
+ this.isRestored = isRestored;
+ this.operatorStateStore = operatorStateStore;
+ }
+
+ @Override
+ public boolean isRestored() {
+ return isRestored;
+ }
+
+ @Override
+ public OperatorStateStore getOperatorStateStore() {
+ return operatorStateStore;
+ }
+
+ @Override
+ public KeyedStateStore getKeyedStateStore() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/PulsarTestMetaManagerUtil.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/PulsarTestMetaManagerUtil.java
new file mode 100644
index 0000000..7f41a1c
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/PulsarTestMetaManagerUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.sort.ZkTools;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.apache.inlong.sort.util.TestMetaManagerUtil;
+
+public class PulsarTestMetaManagerUtil extends TestMetaManagerUtil {
+
+ public PulsarTestMetaManagerUtil() throws Exception {
+ super();
+ }
+
+ @Override
+ public void addOrUpdateDataFlowInfo(long dataFlowId, String... args) throws Exception {
+ ZkTools.addDataFlowToCluster(cluster, dataFlowId, ZOOKEEPER.getConnectString(), zkRoot);
+ ZkTools.updateDataFlowInfo(
+ prepareDataFlowInfo(dataFlowId, args[0], args[1], args[2], args[3]),
+ cluster,
+ dataFlowId,
+ ZOOKEEPER.getConnectString(),
+ zkRoot);
+ }
+
+ @Override
+ public void initDataFlowInfo(String... args) throws Exception {
+ addOrUpdateDataFlowInfo(1L, args[0], args[1], args[2], args[3]);
+ }
+
+ @Override
+ public DataFlowInfo prepareDataFlowInfo(long dataFlowId, String... args) {
+
+ FieldInfo[] pulsarFields = new FieldInfo[] {
+ new FieldInfo("f1", StringFormatInfo.INSTANCE),
+ new FieldInfo("f2", StringFormatInfo.INSTANCE)
+ };
+
+ Map<String, Object> config = new HashMap<>();
+ config.put("pulsar.source.consumer.bootstrap-mode", "earliest");
+
+ return new DataFlowInfo(
+ dataFlowId,
+ new PulsarSourceInfo(
+ args[0],
+ args[1],
+ args[2],
+ args[3],
+ new CsvDeserializationInfo(','),
+ pulsarFields),
+ new HiveSinkInfo(
+ new FieldInfo[0],
+ "testServerJdbcUrl",
+ "testDatabaseName",
+ "testTableName",
+ "testUsername",
+ "testPassword",
+ "testDataPath",
+ new HivePartitionInfo[0],
+ new TextFileFormat(',')),
+ config
+ );
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/TestSourceContext.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/TestSourceContext.java
new file mode 100644
index 0000000..1d6ff06
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/TestSourceContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * A testable {@link SourceFunction.SourceContext}.
+ */
+public class TestSourceContext<T> implements SourceFunction.SourceContext<T> {
+
+ private final Object checkpointLock = new Object();
+
+ private LinkedBlockingQueue<StreamRecord<T>> collectedOutputs = new LinkedBlockingQueue<>();
+
+ @Override
+ public void collect(T element) {
+ this.collectedOutputs.add(new StreamRecord<>(element));
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ this.collectedOutputs.offer(new StreamRecord<>(element, timestamp));
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ public StreamRecord<T> removeLatestOutput() {
+ return collectedOutputs.poll();
+ }
+
+ public LinkedBlockingQueue<StreamRecord<T>> getCollectedOutputs() {
+ return collectedOutputs;
+ }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/util/TestMetaManagerUtil.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/util/TestMetaManagerUtil.java
new file mode 100644
index 0000000..9b4c386
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/util/TestMetaManagerUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import org.apache.inlong.sort.ZkTools;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+
+public abstract class TestMetaManagerUtil {
+
+ protected static ZooKeeperTestEnvironment ZOOKEEPER;
+
+ protected final String cluster = "cluster";
+
+ protected final String zkRoot = "/test";
+
+ protected final Configuration config = new Configuration();
+
+ protected final MetaManager metaManager;
+
+ public TestMetaManagerUtil() throws Exception {
+ ZOOKEEPER = new ZooKeeperTestEnvironment(1);
+ config.setString(Constants.CLUSTER_ID, cluster);
+ config.setString(Constants.ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString());
+ config.setString(Constants.ZOOKEEPER_ROOT, zkRoot);
+ metaManager = MetaManager.getInstance(config);
+ }
+
+ public Configuration getConfig() {
+ return config;
+ }
+
+ public void deleteDataFlowInfo(long dataFlowId) throws Exception {
+ ZkTools.removeDataFlowFromCluster(cluster, dataFlowId, ZOOKEEPER.getConnectString(), zkRoot);
+ }
+
+ public abstract void addOrUpdateDataFlowInfo(long dataFlowId, String... args) throws Exception;
+
+ public abstract void initDataFlowInfo(String... args) throws Exception;
+
+ public abstract DataFlowInfo prepareDataFlowInfo(long dataFlowId, String... args);
+
+ public void cleanup() throws Exception {
+ MetaManager.release();
+ ZOOKEEPER.shutdown();
+ }
+
+}