You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/22 04:05:27 UTC
[seatunnel] branch dev updated: [Feature][Json-format] support read format for pulsar (#4111)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7d61ae93e [Feature][Json-format] support read format for pulsar (#4111)
7d61ae93e is described below
commit 7d61ae93e72709238c54127a5ad62dde5df99130
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Mon May 22 12:05:20 2023 +0800
[Feature][Json-format] support read format for pulsar (#4111)
---
docs/en/Connector-v2-release-state.md | 2 +-
.../connector-v2/source/{pulsar.md => Pulsar.md} | 14 +-
release-note.md | 1 +
.../apache/seatunnel/common/utils/JsonUtils.java | 15 +-
seatunnel-connectors-v2/connector-pulsar/pom.xml | 8 +-
.../seatunnel/pulsar/config/SourceProperties.java | 24 ++
.../seatunnel/pulsar/source/PulsarSource.java | 61 +++-
.../source/enumerator/PulsarSplitEnumerator.java | 7 +-
.../enumerator/PulsarSplitEnumeratorState.java | 2 +-
.../pulsar/source/format/PulsarCanalDecorator.java | 103 +++++++
.../pulsar/source/reader/PulsarSourceReader.java | 121 ++++----
.../source/reader/PulsarSplitReaderThread.java | 3 +
.../pulsar/source/PulsarCanalDecoratorTest.java | 87 ++++++
.../{ => connector-pulsar-e2e}/pom.xml | 100 +++---
.../e2e/connector/pulsar/CanalToPulsarIT.java | 341 +++++++++++++++++++++
.../e2e/connector/pulsar/PulsarBatchIT.java | 179 +++++++++++
.../test/resources/batch_pulsar_to_console.conf | 162 ++++++++++
.../src/test/resources/cdc_canal_pulsar_to_pg.conf | 65 ++++
.../src/test/resources/ddl/canal.sql | 47 +++
.../src/test/resources/mysql/server-gtids/my.cnf | 60 ++++
.../src/test/resources/mysql/setup.sql | 27 ++
.../pulsar/canal-mysql-source-config.yaml | 26 ++
.../test/resources/pulsar/start_canal_connector.sh | 26 ++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../json/canal/CanalJsonDeserializationSchema.java | 8 +-
.../json/canal/CanalJsonSerDeSchemaTest.java | 2 +-
26 files changed, 1356 insertions(+), 136 deletions(-)
diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md
index 74a183c73..308cb010b 100644
--- a/docs/en/Connector-v2-release-state.md
+++ b/docs/en/Connector-v2-release-state.md
@@ -67,7 +67,7 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex
| [OssFile](connector-v2/source/OssFile.md) | Source | Beta | 2.2.0-beta |
| [Phoenix](connector-v2/sink/Phoenix.md) | Sink | Beta | 2.2.0-beta |
| [Phoenix](connector-v2/source/Phoenix.md) | Source | Beta | 2.2.0-beta |
-| [Pulsar](connector-v2/source/pulsar.md) | Source | Beta | 2.2.0-beta |
+| [Pulsar](connector-v2/source/Pulsar.md) | Source | Beta | 2.2.0-beta |
| [RabbitMQ](connector-v2/sink/Rabbitmq.md) | Sink | Beta | 2.3.0 |
| [RabbitMQ](connector-v2/source/Rabbitmq.md) | Source | Beta | 2.3.0 |
| [Redis](connector-v2/sink/Redis.md) | Sink | Beta | 2.2.0-beta |
diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/Pulsar.md
similarity index 91%
rename from docs/en/connector-v2/source/pulsar.md
rename to docs/en/connector-v2/source/Pulsar.md
index 3bdf8d235..9eba505e7 100644
--- a/docs/en/connector-v2/source/pulsar.md
+++ b/docs/en/connector-v2/source/Pulsar.md
@@ -37,6 +37,7 @@ Source connector for Apache Pulsar.
| cursor.stop.timestamp | Long | No | - |
| schema | config | No | - |
| common-options | | no | - |
+| format | String | no | json |
### topic [String]
@@ -126,9 +127,12 @@ Stop from the specified epoch timestamp (in milliseconds).
### schema [Config]
-#### fields [Config]
+The structure of the data, including field names and field types.
+reference to [Schema-Feature](../../concept/schema-feature.md)
-the schema fields of upstream data.
+## format [String]
+
+Data format. The default format is json, reference [formats](../formats).
### common options
@@ -141,7 +145,7 @@ source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
- client.service-url = "localhost:pulsar://localhost:6650"
+ client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
result_table_name = "test"
}
@@ -154,3 +158,7 @@ source {
- Add Pulsar Source Connector
+### next version
+
+- [Feature] Add Pulsar canal-format and e2e ([4111](https://github.com/apache/incubator-seatunnel/pull/4111))
+
diff --git a/release-note.md b/release-note.md
index abdc5ac14..1ee3b4986 100644
--- a/release-note.md
+++ b/release-note.md
@@ -54,6 +54,7 @@
- [Connector-V2] [Jdbc] add the log for sql and update some style (#4475)
- [Connector-V2] [Jdbc] Fix the table name is not automatically obtained when multiple tables (#4514)
- [Connector-V2] [S3 & Kafka] Delete unavailable S3 & Kafka Catalogs (#4477)
+- [Connector-V2] [Pulsar] Support Canal Format
### CI
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 15184e88e..181d03555 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -233,14 +233,15 @@ public class JsonUtils {
}
public static ObjectNode parseObject(String text) {
+ return parseObject(text.getBytes());
+ }
+
+ public static ObjectNode parseObject(byte[] content) {
try {
- if (text.isEmpty()) {
- return parseObject(text, ObjectNode.class);
- } else {
- return (ObjectNode) OBJECT_MAPPER.readTree(text);
- }
- } catch (Exception e) {
- throw new RuntimeException("String json deserialization exception.", e);
+ return (ObjectNode) OBJECT_MAPPER.readTree(content);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "String json deserialization exception." + new String(content), e);
}
}
diff --git a/seatunnel-connectors-v2/connector-pulsar/pom.xml b/seatunnel-connectors-v2/connector-pulsar/pom.xml
index 6fcad7f58..12bbec591 100644
--- a/seatunnel-connectors-v2/connector-pulsar/pom.xml
+++ b/seatunnel-connectors-v2/connector-pulsar/pom.xml
@@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : Pulsar</name>
<properties>
- <pulsar.version>2.8.0</pulsar.version>
+ <pulsar.version>2.11.0</pulsar.version>
<commons-lang3.version>3.4</commons-lang3.version>
</properties>
@@ -100,5 +100,11 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-text</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
index 21ebcaef8..413693226 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatFactory;
public class SourceProperties {
@@ -168,6 +171,27 @@ public class SourceProperties {
.noDefaultValue()
.withDescription("Stop from the specified epoch timestamp (in milliseconds)");
+ public static final Option<Config> SCHEMA =
+ Options.key("schema")
+ .objectType(Config.class)
+ .noDefaultValue()
+ .withDescription(
+ "The structure of the data, including field names and field types.");
+
+ public static final Option<String> FORMAT =
+ Options.key("format")
+ .stringType()
+ .defaultValue(JsonFormatFactory.IDENTIFIER)
+ .withDescription(
+ "Data format. The default format is json. Optional text format. The default field separator is \", \". "
+ + "If you customize the delimiter, add the \"field_delimiter\" option.");
+
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field_delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("Customize the field delimiter for data format.");
+
/** Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}. */
public enum StartMode {
/** Start from the earliest cursor possible. */
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index d41077922..cf5972aa1 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -28,11 +28,12 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
@@ -48,9 +49,14 @@ import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.format.PulsarCanalDecorator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.JsonFormatFactory;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import org.apache.seatunnel.format.json.canal.CanalJsonFormatFactory;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
@@ -70,9 +76,11 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
@@ -80,10 +88,13 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
@AutoService(SeaTunnelSource.class)
-public class PulsarSource<T>
- implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState>,
+public class PulsarSource
+ implements SeaTunnelSource<SeaTunnelRow, PulsarPartitionSplit, PulsarSplitEnumeratorState>,
SupportParallelism {
- private DeserializationSchema<T> deserialization;
+
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ private SeaTunnelRowType typeInfo;
private PulsarAdminConfig adminConfig;
private PulsarClientConfig clientConfig;
@@ -291,11 +302,31 @@ public class PulsarSource<T>
}
private void setDeserialization(Config config) {
- String format = config.getString("format");
- // TODO: format SPI
- SeaTunnelRowType rowType = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
- deserialization =
- (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
+ if (config.hasPath(SCHEMA.key())) {
+ typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+ String format = FORMAT.defaultValue();
+ if (config.hasPath(FORMAT.key())) {
+ format = config.getString(FORMAT.key());
+ }
+ switch (format) {
+ case JsonFormatFactory.IDENTIFIER:
+ deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+ break;
+ case CanalJsonFormatFactory.IDENTIFIER:
+ deserializationSchema =
+ new PulsarCanalDecorator(
+ CanalJsonDeserializationSchema.builder(typeInfo)
+ .setIgnoreParseErrors(true)
+ .build());
+ break;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+ }
+ } else {
+ typeInfo = CatalogTableUtil.buildSimpleTextSchema();
+ this.deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+ }
}
@Override
@@ -306,19 +337,19 @@ public class PulsarSource<T>
}
@Override
- public SeaTunnelDataType<T> getProducedType() {
- return deserialization.getProducedType();
+ public SeaTunnelRowType getProducedType() {
+ return this.typeInfo;
}
@Override
- public SourceReader<T, PulsarPartitionSplit> createReader(SourceReader.Context readerContext)
- throws Exception {
+ public SourceReader<SeaTunnelRow, PulsarPartitionSplit> createReader(
+ SourceReader.Context readerContext) throws Exception {
return new PulsarSourceReader<>(
readerContext,
clientConfig,
consumerConfig,
startCursor,
- deserialization,
+ deserializationSchema,
pollTimeout,
pollInterval,
batchSize);
@@ -352,6 +383,6 @@ public class PulsarSource<T>
startCursor,
stopCursor,
consumerConfig.getSubscriptionName(),
- checkpointState.assignedPartitions());
+ checkpointState.getAssignedPartitions());
}
}
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
index bc570513f..ddf6cb2d6 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -189,11 +189,12 @@ public class PulsarSplitEnumerator
}
private Set<TopicPartition> getNewPartitions(Set<TopicPartition> fetchedPartitions) {
- Consumer<TopicPartition> dedupOrMarkAsRemoved = fetchedPartitions::remove;
- assignedPartitions.forEach(dedupOrMarkAsRemoved);
+ Consumer<TopicPartition> duplicateOrMarkAsRemoved = fetchedPartitions::remove;
+ assignedPartitions.forEach(duplicateOrMarkAsRemoved);
pendingPartitionSplits.forEach(
(reader, splits) ->
- splits.forEach(split -> dedupOrMarkAsRemoved.accept(split.getPartition())));
+ splits.forEach(
+ split -> duplicateOrMarkAsRemoved.accept(split.getPartition())));
if (!fetchedPartitions.isEmpty()) {
LOG.info("Discovered new partitions: {}", fetchedPartitions);
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
index 8df46e7b4..b000da9fb 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
@@ -30,7 +30,7 @@ public class PulsarSplitEnumeratorState implements Serializable {
this.assignedPartitions = assignedPartitions;
}
- public Set<TopicPartition> assignedPartitions() {
+ public Set<TopicPartition> getAssignedPartitions() {
return assignedPartitions;
}
}
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/format/PulsarCanalDecorator.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/format/PulsarCanalDecorator.java
new file mode 100644
index 000000000..d6cc201a4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/format/PulsarCanalDecorator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.format;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * for pulsar-connector, the data format is
+ *
+ * <p>{ "id":0, "message":"[{pulsar-data based on canal}]", "timestamp":"" }
+ */
+public class PulsarCanalDecorator implements DeserializationSchema<SeaTunnelRow> {
+
+ private static final String MESSAGE = "message";
+ private static final String FIELD_DATA = "data";
+ private static final String FIELD_OLD = "old";
+ public static final String COLUMN_NAME = "columnName";
+ public static final String COLUMN_VALUE = "columnValue";
+ public static final String COLUMN_INDEX = "index";
+
+ private final CanalJsonDeserializationSchema canalJsonDeserializationSchema;
+
+ public PulsarCanalDecorator(CanalJsonDeserializationSchema canalJsonDeserializationSchema) {
+ this.canalJsonDeserializationSchema = canalJsonDeserializationSchema;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+ ObjectNode pulsarCanal = JsonUtils.parseObject(message);
+ ArrayNode canalList = JsonUtils.parseArray(pulsarCanal.get(MESSAGE).asText());
+ Iterator<JsonNode> canalIterator = canalList.elements();
+ while (canalIterator.hasNext()) {
+ JsonNode next = canalIterator.next();
+ // reconvert pulsar handler, reference to
+ // https://github.com/apache/pulsar/blob/master/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
+ ObjectNode root = reconvertPulsarData((ObjectNode) next);
+ canalJsonDeserializationSchema.deserialize(root, out);
+ }
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return canalJsonDeserializationSchema.getProducedType();
+ }
+
+ private ObjectNode reconvertPulsarData(ObjectNode root) {
+ root.replace(FIELD_DATA, reconvert(root.get(FIELD_DATA)));
+ root.replace(FIELD_OLD, reconvert(root.get(FIELD_OLD)));
+ return root;
+ }
+
+ private JsonNode reconvert(JsonNode node) {
+ if (!(node instanceof ArrayNode) || node.size() <= 0) {
+ return node;
+ }
+ long firstColumn = node.get(0).get(COLUMN_INDEX).asLong();
+ ArrayNode arrayNode = JsonUtils.createArrayNode();
+ ObjectNode rowMap = JsonUtils.createObjectNode();
+ for (int i = 0; i < node.size(); i++) {
+ ObjectNode columnNode = (ObjectNode) node.get(i);
+ if (firstColumn == columnNode.get(COLUMN_INDEX).asLong()) {
+ arrayNode.add(rowMap);
+ rowMap = JsonUtils.createObjectNode();
+ }
+ rowMap.set(columnNode.get(COLUMN_NAME).asText(), columnNode.get(COLUMN_VALUE));
+ }
+ arrayNode.add(rowMap);
+ arrayNode.remove(0);
+ return arrayNode;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
index c008d4592..1ff0ddfb9 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.common.Handover;
@@ -111,19 +112,16 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
if (pulsarClient != null) {
pulsarClient.close();
}
- splitReaders
- .values()
- .forEach(
- reader -> {
- try {
- reader.close();
- } catch (IOException e) {
- throw new PulsarConnectorException(
- CommonErrorCode.READER_OPERATION_FAILED,
- "Failed to close the split reader thread.",
- e);
- }
- });
+ for (PulsarSplitReaderThread pulsarSplitReaderThread : splitReaders.values()) {
+ try {
+ pulsarSplitReaderThread.close();
+ } catch (IOException e) {
+ throw new PulsarConnectorException(
+ CommonErrorCode.READER_OPERATION_FAILED,
+ "Failed to close the split reader thread.",
+ e);
+ }
+ }
}
@Override
@@ -167,25 +165,23 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
@Override
public void addSplits(List<PulsarPartitionSplit> splits) {
- splits.forEach(
- split -> {
- splitStates.put(split.splitId(), split);
- PulsarSplitReaderThread splitReaderThread =
- createPulsarSplitReaderThread(split);
- try {
- splitReaderThread.setName(
- "Pulsar Source Data Consumer "
- + split.getPartition().getPartition());
- splitReaderThread.open();
- splitReaders.put(split.splitId(), splitReaderThread);
- splitReaderThread.start();
- } catch (PulsarClientException e) {
- throw new PulsarConnectorException(
- CommonErrorCode.READER_OPERATION_FAILED,
- "Failed to start the split reader thread.",
- e);
- }
- });
+ for (PulsarPartitionSplit split : splits) {
+ splitStates.put(split.splitId(), split);
+ PulsarSplitReaderThread splitReaderThread = createPulsarSplitReaderThread(split);
+ try {
+ splitReaderThread.setName(
+ "Pulsar Source Data Consumer " + split.getPartition().getPartition());
+ splitReaderThread.open();
+ splitReaders.put(split.splitId(), splitReaderThread);
+ splitReaderThread.start();
+ LOG.info("PulsarSplitReaderThread = {} start", splitReaderThread.getName());
+ } catch (PulsarClientException e) {
+ throw new PulsarConnectorException(
+ CommonErrorCode.READER_OPERATION_FAILED,
+ "Failed to start the split reader thread.",
+ e);
+ }
+ }
}
protected PulsarSplitReaderThread createPulsarSplitReaderThread(PulsarPartitionSplit split) {
@@ -203,6 +199,10 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
public void handleNoMoreElements(String splitId, MessageId messageId) {
LOG.info("Reader received the split {} NoMoreElements event.", splitId);
pendingCursorsToFinish.put(splitId, messageId);
+ // BOUNDED not trigger snapshot and notifyCheckpointComplete
+ if (context.getBoundedness() == Boundedness.BOUNDED) {
+ finishedSplits.add(splitId);
+ }
}
@Override
@@ -221,32 +221,35 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
checkpointId);
return;
}
- pendingCursors.forEach(
- (splitId, messageId) -> {
- if (finishedSplits.contains(splitId)) {
- return;
- }
- try {
- splitReaders.get(splitId).committingCursor(messageId);
-
- if (pendingCursorsToFinish.containsKey(splitId)
- && pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
- finishedSplits.add(splitId);
- try {
- splitReaders.get(splitId).close();
- } catch (IOException e) {
- throw new PulsarConnectorException(
- CommonErrorCode.READER_OPERATION_FAILED,
- "Failed to close the split reader thread.",
- e);
- }
- }
- } catch (PulsarClientException e) {
- throw new PulsarConnectorException(
- PulsarConnectorErrorCode.ACK_CUMULATE_FAILED,
- "pulsar consumer acknowledgeCumulative failed.",
- e);
- }
- });
+ pendingCursors.forEach(this::committingCursor);
+ }
+
+ /** commit the cursor of consumer thread */
+ private void committingCursor(String splitId, MessageId messageId) {
+ if (finishedSplits.contains(splitId)) {
+ return;
+ }
+ try {
+ PulsarSplitReaderThread pulsarSplitReaderThread = splitReaders.get(splitId);
+ pulsarSplitReaderThread.committingCursor(messageId);
+
+ if (pendingCursorsToFinish.containsKey(splitId)
+ && pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
+ finishedSplits.add(splitId);
+ try {
+ pulsarSplitReaderThread.close();
+ } catch (IOException e) {
+ throw new PulsarConnectorException(
+ CommonErrorCode.READER_OPERATION_FAILED,
+ "Failed to close the split reader thread.",
+ e);
+ }
+ }
+ } catch (PulsarClientException e) {
+ throw new PulsarConnectorException(
+ PulsarConnectorErrorCode.ACK_CUMULATE_FAILED,
+ "pulsar consumer acknowledgeCumulative failed.",
+ e);
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
index 7dfd0d595..301d62d96 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
@@ -100,6 +100,7 @@ public class PulsarSplitReaderThread extends Thread implements Closeable {
Thread.sleep(pollInterval);
}
} catch (Throwable t) {
+ LOG.error("Pulsar Consumer receive data error", t);
handover.reportError(t);
} finally {
// make sure the PulsarConsumer is closed
@@ -107,6 +108,8 @@ public class PulsarSplitReaderThread extends Thread implements Closeable {
consumer.close();
} catch (Throwable t) {
LOG.warn("Error while closing pulsar consumer", t);
+ } finally {
+ running = false;
}
}
}
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
new file mode 100644
index 000000000..0076bbbdb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.format.PulsarCanalDecorator;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.Getter;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PulsarCanalDecoratorTest {
+ private static final String json =
+ "{"
+ + " \"id\": 3,\n"
+ + " \"message\": \"[{\\\"data\\\":[{\\\"isKey\\\":\\\"1\\\",\\\"isNull\\\":\\\"0\\\",\\\"index\\\":\\\"0\\\",\\\"mysqlType\\\":\\\"INTEGER\\\",\\\"columnName\\\":\\\"id\\\",\\\"columnValue\\\":\\\"109\\\",\\\"updated\\\":\\\"0\\\"},{\\\"isKey\\\":\\\"0\\\",\\\"isNull\\\":\\\"0\\\",\\\"index\\\":\\\"1\\\",\\\"mysqlType\\\":\\\"VARCHAR(255)\\\",\\\"columnName\\\":\\\"name\\\",\\\"columnValue\\\":\\\"spare tire\\\",\\\"updated\\\":\\\"0\\\"},{\\\"isKey\\\":\\\"0\\\",\\\ [...]
+ + " \"timestamp\": \"2023-04-02 05:06:58\""
+ + "}";
+
+ @Test
+ void decoder() throws IOException {
+ String[] fieldNames = new String[] {"id", "name", "description", "weight"};
+ SeaTunnelDataType<?>[] dataTypes =
+ new SeaTunnelDataType[] {
+ BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ };
+
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, dataTypes);
+
+ CanalJsonDeserializationSchema canalJsonDeserializationSchema =
+ CanalJsonDeserializationSchema.builder(seaTunnelRowType).build();
+ PulsarCanalDecorator pulsarCanalDecorator =
+ new PulsarCanalDecorator(canalJsonDeserializationSchema);
+
+ SimpleCollector simpleCollector = new SimpleCollector();
+ pulsarCanalDecorator.deserialize(json.getBytes(StandardCharsets.UTF_8), simpleCollector);
+ Assertions.assertFalse(simpleCollector.getList().isEmpty());
+ for (SeaTunnelRow seaTunnelRow : simpleCollector.list) {
+ for (Object field : seaTunnelRow.getFields()) {
+ Assertions.assertNotNull(field);
+ }
+ }
+ }
+
+ private static class SimpleCollector implements Collector<SeaTunnelRow> {
+ @Getter private List<SeaTunnelRow> list = new ArrayList<>();
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ list.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml
similarity index 52%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml
index 498a33054..2d9e31c33 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml
@@ -18,83 +18,97 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
- <artifactId>seatunnel-connector-v2-e2e</artifactId>
- <packaging>pom</packaging>
- <name>SeaTunnel : E2E : Connector V2 :</name>
+ <artifactId>connector-pulsar-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Pulsar</name>
- <modules>
- <module>connector-assert-e2e</module>
- <module>connector-jdbc-e2e</module>
- <module>connector-redis-e2e</module>
- <module>connector-cdc-sqlserver-e2e</module>
- <module>connector-clickhouse-e2e</module>
- <module>connector-starrocks-e2e</module>
- <module>connector-influxdb-e2e</module>
- <module>connector-amazondynamodb-e2e</module>
- <module>connector-file-local-e2e</module>
- <module>connector-file-sftp-e2e</module>
- <module>connector-cassandra-e2e</module>
- <module>connector-neo4j-e2e</module>
- <module>connector-http-e2e</module>
- <module>connector-rabbitmq-e2e</module>
- <module>connector-kafka-e2e</module>
- <module>connector-doris-e2e</module>
- <module>connector-fake-e2e</module>
- <module>connector-elasticsearch-e2e</module>
- <module>connector-iotdb-e2e</module>
- <module>connector-cdc-mysql-e2e</module>
- <module>connector-iceberg-e2e</module>
- <module>connector-iceberg-hadoop3-e2e</module>
- <module>connector-tdengine-e2e</module>
- <module>connector-datahub-e2e</module>
- <module>connector-mongodb-e2e</module>
- <module>connector-hbase-e2e</module>
- <module>connector-maxcompute-e2e</module>
- <module>connector-google-firestore-e2e</module>
- <module>connector-rocketmq-e2e</module>
- <module>connector-paimon-e2e</module>
- </modules>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-jdbc</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e-common</artifactId>
+ <artifactId>connector-pulsar</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-13-starter</artifactId>
+ <artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-15-starter</artifactId>
+ <artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-2-starter</artifactId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-3-starter</artifactId>
+ <artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-starter</artifactId>
+ <artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
new file mode 100644
index 000000000..716bd7dc9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+/**
+ * canal server producer data to pulsar, st-cdc is consumer reference:
+ * https://pulsar.apache.org/docs/2.11.x/io-canal-source/
+ */
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "spark would ignore delete type")
+public class CanalToPulsarIT extends TestSuiteBase implements TestResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CanalToPulsarIT.class);
+
+ // ----------------------------------------------------------------------------
+ // mysql
+ private static final String MYSQL_HOST = "mysql.e2e";
+
+ private static final int MYSQL_PORT = 3306;
+ public static final String MYSQL_USER = "st_user";
+ public static final String MYSQL_PASSWORD = "seatunnel";
+
+ private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
+
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ MySqlContainer mySqlContainer =
+ new MySqlContainer(version)
+ .withConfigurationOverride("mysql/server-gtids/my.cnf")
+ .withSetupSQL("mysql/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName("canal")
+ .withUsername(MYSQL_USER)
+ .withPassword(MYSQL_PASSWORD)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ mySqlContainer.withExposedPorts(MYSQL_PORT);
+ return mySqlContainer;
+ }
+
+ // ----------------------------------------------------------------------------
+ // postgres
+ private static final String PG_IMAGE = "postgres:alpine3.16";
+
+ private static final String PG_DRIVER_JAR =
+ "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+ private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + PG_DRIVER_JAR);
+
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ private void createPostgreSQLContainer() throws ClassNotFoundException {
+ POSTGRESQL_CONTAINER =
+ new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withExposedPorts(5432)
+ .withLogConsumer(
+ new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection =
+ DriverManager.getConnection(
+ POSTGRESQL_CONTAINER.getJdbcUrl(),
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sink =
+ "create table sink(\n"
+ + "id INT NOT NULL PRIMARY KEY,\n"
+ + "name varchar(255),\n"
+ + "description varchar(255),\n"
+ + "weight varchar(255)"
+ + ")";
+ statement.execute(sink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
+ }
+
+ // ----------------------------------------------------------------------------
+ // canal
+ private static GenericContainer<?> CANAL_CONTAINER;
+
+ private static final String CANAL_DOCKER_IMAGE = "canal/canal-server:v1.1.2";
+
+ private static final String CANAL_HOST = "canal.e2e";
+
+ private void createCanalContainer() {
+ CANAL_CONTAINER =
+ new GenericContainer<>(CANAL_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(CANAL_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+
+ CANAL_CONTAINER
+ .withEnv("canal.auto.scan", "false")
+ .withEnv("canal.destinations", "test")
+ .withEnv(
+ "canal.instance.master.address",
+ String.format("%s:%s", MYSQL_HOST, MYSQL_PORT))
+ .withEnv("canal.instance.dbUsername", MYSQL_USER)
+ .withEnv("canal.instance.dbPassword", MYSQL_PASSWORD)
+ .withEnv("canal.instance.connectionCharset", "UTF-8")
+ .withEnv("canal.instance.tsdb.enable", "true")
+ .withEnv("canal.instance.gtidon", "false");
+ }
+
+ // ----------------------------------------------------------------------------
+ // pulsar container
+ // download canal connector is so slowly,make it with canal connector from apache/pulsar
+ private static final String PULSAR_IMAGE_NAME = "laglangyue/pulsar_canal:2.3.1";
+
+ private static final String PULSAR_HOST = "pulsar.e2e";
+ private static final String TOPIC = "test-cdc_mds";
+
+ private static final Integer PULSAR_BROKER_PORT = 6650;
+ private static final Integer PULSAR_BROKER_HTTP_PORT = 8080;
+
+ private static GenericContainer<?> PULSAR_CONTAINER;
+
+ private void createPulsarContainer() {
+ PULSAR_CONTAINER =
+ new GenericContainer<>(DockerImageName.parse(PULSAR_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PULSAR_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
+ PULSAR_CONTAINER.withExposedPorts(PULSAR_BROKER_PORT, PULSAR_BROKER_HTTP_PORT);
+
+ // canal connectors config
+ PULSAR_CONTAINER.withCopyFileToContainer(
+ MountableFile.forClasspathResource("pulsar/canal-mysql-source-config.yaml"),
+ "/pulsar/conf/");
+ // start connectors cmd
+ PULSAR_CONTAINER.withCopyFileToContainer(
+ MountableFile.forClasspathResource("pulsar/start_canal_connector.sh"), "/pulsar/");
+ // wait for pulsar started
+ List<WaitStrategy> waitStrategies = new ArrayList<>();
+ waitStrategies.add(Wait.forLogMessage(".*pulsar entered RUNNING state.*", 1));
+ waitStrategies.add(Wait.forLogMessage(".*canal entered RUNNING state.*", 1));
+ final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
+ waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
+ PULSAR_CONTAINER.waitingFor(compoundedWaitStrategy);
+ }
+
+ private void waitForTopicCreated() throws PulsarClientException {
+ try (PulsarAdmin pulsarAdmin =
+ PulsarAdmin.builder()
+ .serviceHttpUrl(
+ String.format(
+ "http://%s:%s",
+ PULSAR_CONTAINER.getHost(),
+ PULSAR_CONTAINER.getMappedPort(PULSAR_BROKER_HTTP_PORT)))
+ .build()) {
+ while (true) {
+ try {
+ List<String> topics = pulsarAdmin.topics().getList("public/default");
+ if (topics.stream().anyMatch(t -> StringUtils.contains(t, TOPIC))) {
+ break;
+ }
+ Thread.sleep(5000);
+ } catch (PulsarAdminException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws ClassNotFoundException, InterruptedException {
+ LOG.info("The second stage: Starting Mysql containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Mysql Containers are started");
+
+ LOG.info("The third stage: Starting Canal containers...");
+ createCanalContainer();
+ Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+ LOG.info("Canal Containers are started");
+
+ LOG.info("Starting Pulsar containers...");
+ createPulsarContainer();
+ Startables.deepStart(Stream.of(PULSAR_CONTAINER)).join();
+ LOG.info("Pulsar Containers are started");
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.MINUTES)
+ .untilAsserted(this::waitForTopicCreated);
+ // before ddl, the pulsar_canal connector should be started
+ inventoryDatabase.createAndInitialize();
+ // wait pulsar get data from canal server
+ Thread.sleep(10 * 1000);
+ LOG.info("The fourth stage: Starting PostgresSQL container...");
+ createPostgreSQLContainer();
+ Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+ Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+ LOG.info("postgresql Containers are started");
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(5, TimeUnit.MINUTES)
+ .untilAsserted(this::initializeJdbcTable);
+ }
+
+ @Override
+ public void tearDown() {
+ MYSQL_CONTAINER.close();
+ CANAL_CONTAINER.close();
+ PULSAR_CONTAINER.close();
+ }
+
+ @TestTemplate
+ void testCanalFormatMessages(TestContainer container)
+ throws IOException, InterruptedException, SQLException {
+ Container.ExecResult execResult = container.executeJob("/cdc_canal_pulsar_to_pg.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+ List<List<Object>> actual = new ArrayList<>();
+ try (Connection connection =
+ DriverManager.getConnection(
+ POSTGRESQL_CONTAINER.getJdbcUrl(),
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword())) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery("SELECT * FROM sink ORDER BY id");
+ while (resultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+ resultSet.getInt("id"),
+ resultSet.getString("name"),
+ resultSet.getString("description"),
+ resultSet.getString("weight"));
+ actual.add(row);
+ }
+ }
+ }
+ List<List<Object>> expected =
+ Lists.newArrayList(
+ Arrays.asList(101, "scooter", "Small 2-wheel scooter", "4.56"),
+ Arrays.asList(102, "car battery", "12V car battery", "8.1"),
+ Arrays.asList(
+ 103,
+ "12-pack drill bits",
+ "12-pack of drill bits with sizes ranging from #40 to #3",
+ "0.8"),
+ Arrays.asList(104, "hammer", "12oz carpenter's hammer", "0.75"),
+ Arrays.asList(105, "hammer", "14oz carpenter's hammer", "0.875"),
+ Arrays.asList(106, "hammer", "16oz carpenter's hammer", "1.0"),
+ Arrays.asList(107, "rocks", "box of assorted rocks", "7.88"),
+ Arrays.asList(108, "jacket", "water resistent black wind breaker", "0.1"));
+ Assertions.assertIterableEquals(expected, actual);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
new file mode 100644
index 000000000..b1ea69efa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeDataGenerator;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class PulsarBatchIT extends TestSuiteBase implements TestResource {
+
+ private static final String PULSAR_IMAGE_NAME = "apachepulsar/pulsar:2.3.1";
+ public static final String PULSAR_HOST = "pulsar.batch.e2e";
+ public static final String TOPIC = "topic-it";
+ private PulsarContainer pulsarContainer;
+ private PulsarClient client;
+ private Producer<byte[]> producer;
+
+ private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+ new SeaTunnelRowType(
+ new String[] {
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[] {
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+ ArrayType.INT_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(38, 10),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+
+ @Override
+ @BeforeAll
+ public void startUp() throws Exception {
+ pulsarContainer =
+ new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(PULSAR_HOST)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
+
+ Startables.deepStart(Stream.of(pulsarContainer)).join();
+ Awaitility.given()
+ .ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initTopic);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ pulsarContainer.close();
+ client.close();
+ producer.close();
+ }
+
+ private void initTopic() throws PulsarClientException {
+ client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
+ producer = client.newProducer(Schema.BYTES).topic(TOPIC).create();
+ produceData();
+ }
+
+ private void produceData() {
+
+ try {
+ FakeConfig fakeConfig = FakeConfig.buildWithConfig(ConfigFactory.empty());
+ FakeDataGenerator fakeDataGenerator =
+ new FakeDataGenerator(SEATUNNEL_ROW_TYPE, fakeConfig);
+ SimpleCollector simpleCollector = new SimpleCollector();
+ fakeDataGenerator.collectFakedRows(100, simpleCollector);
+ JsonSerializationSchema jsonSerializationSchema =
+ new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);
+ for (SeaTunnelRow seaTunnelRow : simpleCollector.getList()) {
+ producer.send(jsonSerializationSchema.serialize(seaTunnelRow));
+ }
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("produce data error", e);
+ }
+ }
+
+ private static class SimpleCollector implements Collector<SeaTunnelRow> {
+ @Getter private List<SeaTunnelRow> list = new ArrayList<>();
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ list.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ }
+
+ @TestTemplate
+ void testPulsarBatch(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/batch_pulsar_to_console.conf");
+ Assertions.assertEquals(execResult.getExitCode(), 0);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
new file mode 100644
index 000000000..59d2efc86
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Pulsar {
+ client.service-url = "pulsar://pulsar.batch.e2e:6650"
+ admin.service-url = "http://pulsar.batch.e2e:8080"
+ subscription.name = "e2e"
+ topic = "topic-it"
+ cursor.startup.mode = "EARLIEST"
+ cursor.stop.mode = "LATEST"
+ format = json
+ result_table_name = "pulsar_canal"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_float
+ field_type = float
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_tinyint
+ field_type = byte
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_smallint
+ field_type = short
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_int
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_bigint
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_date
+ field_type = date
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_timestamp
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/cdc_canal_pulsar_to_pg.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/cdc_canal_pulsar_to_pg.conf
new file mode 100644
index 000000000..c287be676
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/cdc_canal_pulsar_to_pg.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Pulsar {
+ client.service-url = "pulsar://pulsar.e2e:6650"
+ admin.service-url = "http://pulsar.e2e:8080"
+ subscription.name = "e2e_canal_cdc_subscription"
+ topic = "test-cdc_mds"
+ cursor.startup.mode = "EARLIEST"
+ cursor.stop.mode = "LATEST"
+ format = canal_json
+ result_table_name = "pulsar_canal"
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ }
+ }
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ generate_sink_sql = true
+ database = public
+ table = sink
+ primary_keys = ["id"]
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/ddl/canal.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/ddl/canal.sql
new file mode 100644
index 000000000..ed75e315b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/ddl/canal.sql
@@ -0,0 +1,47 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: canal
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products
+(
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ description VARCHAR(512),
+ weight VARCHAR(512)
+);
+ALTER TABLE products
+ AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default, "scooter", "Small 2-wheel scooter", "3.14"),
+ (default, "car battery", "12V car battery", "8.1"),
+ (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", "0.8"),
+ (default, "hammer", "12oz carpenter's hammer", "0.75"),
+ (default, "hammer", "14oz carpenter's hammer", "0.875"),
+ (default, "hammer", "16oz carpenter's hammer", "1.0"),
+ (default, "rocks", "box of assorted rocks", "5.3"),
+ (default, "jacket", "water resistent black wind breaker", "0.1"),
+ (default, "spare tire", "24 inch spare tire", "22.2");
+
+UPDATE products SET weight = '4.56' WHERE name = 'scooter';
+UPDATE products SET weight = '7.88' WHERE name = 'rocks';
+
+DELETE FROM products WHERE name = "spare tire";
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/server-gtids/my.cnf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/server-gtids/my.cnf
new file mode 100644
index 000000000..2a0e40ba1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/server-gtids/my.cnf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but would
+# be longer on a production system. Row-level info is required for ingest to work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+binlog_format = row
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/setup.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/setup.sql
new file mode 100644
index 000000000..e2b73c3ec
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/setup.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'st_user' - all privileges required by the snapshot reader AND binlog reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'st_user'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/canal-mysql-source-config.yaml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/canal-mysql-source-config.yaml
new file mode 100644
index 000000000..af6e3f7bd
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/canal-mysql-source-config.yaml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+configs:
+ zkServers: ""
+ batchSize: "5120"
+ destination: "test"
+ username: ""
+ password: ""
+ cluster: false
+ singleHostname: "canal.e2e"
+ singlePort: "11111"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/start_canal_connector.sh b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/start_canal_connector.sh
new file mode 100644
index 000000000..de6508fbb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/start_canal_connector.sh
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+./bin/pulsar-admin source localrun \
+ --archive ./connectors/pulsar-io-canal-2.3.0.nar \
+ --classname org.apache.pulsar.io.canal.CanalStringSource \
+ --tenant public \
+ --namespace default \
+ --name canal \
+ --destination-topic-name test-cdc_mds \
+ --source-config-file /pulsar/conf/canal-mysql-source-config.yaml \
+ --parallelism 1
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 498a33054..070ab6e93 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -56,6 +56,7 @@
<module>connector-maxcompute-e2e</module>
<module>connector-google-firestore-e2e</module>
<module>connector-rocketmq-e2e</module>
+ <module>connector-pulsar-e2e</module>
<module>connector-paimon-e2e</module>
</modules>
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index 948c0e91e..44cd6a9a8 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -120,6 +120,10 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea
}
ObjectNode jsonNode = (ObjectNode) convertBytes(message);
assert jsonNode != null;
+ deserialize(jsonNode, out);
+ }
+
+ public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) {
if (database != null
&& !databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
return;
@@ -137,7 +141,7 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea
}
throw new SeaTunnelJsonFormatException(
CommonErrorCode.JSON_OPERATION_FAILED,
- format("Null data value \"%s\" Cannot send downstream", new String(message)));
+ format("Null data value \"%s\" Cannot send downstream", jsonNode));
}
if (OP_INSERT.equals(type)) {
for (int i = 0; i < dataNode.size(); i++) {
@@ -179,7 +183,7 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
format(
"Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
- type, new String(message)));
+ type, jsonNode.asText()));
}
}
}
diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index 16938c003..c7544f6db 100644
--- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -65,7 +65,7 @@ public class CanalJsonSerDeSchemaTest {
createCanalJsonDeserializationSchema(null, null);
final SimpleCollector collector = new SimpleCollector();
- deserializationSchema.deserialize(null, collector);
+ deserializationSchema.deserialize((byte[]) null, collector);
assertEquals(0, collector.list.size());
}