You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/29 06:41:40 UTC
[pulsar] branch master updated: Debezium: integrate
KafkaConnectSource with MySqlConnectorTask (#2791)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d13fe5b Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791)
d13fe5b is described below
commit d13fe5bc287d2bc082da1682703d01dc92140a3b
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Mon Oct 29 14:41:36 2018 +0800
Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791)
### Motivation
This change is to integrate KafkaConnectSource with MySqlConnectorTask.
Steps to run this is like this:
1, use image in debezium tutorial, start mysql server
```docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8```
2, start pulsar standalone
```bin/pulsar standalone ```
3, start consume data from connect source target topic, After step 4 is start, it should get data.
```bin/pulsar-client consume -s "my-sub-name" kafka-connect-topic -n 0```
4, start source connect, It should produce data into Pulsar topics.
```bin/pulsar-admin source localrun --tenant public --namespace default --name kafka --destination-topic-name kafka-connect-topic --sourceConfigFile debezium-mysql-source-config.yaml --archive connectors/pulsar-io-kafka-connect-adaptor-2.2.0-SNAPSHOT.nar --brokerServiceUrl pulsar://127.0.0.1:6650```
---
distribution/io/src/assemble/io.xml | 6 +++
pom.xml | 2 +-
pulsar-io/debezium/pom.xml | 8 ++--
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 2 +-
pulsar-io/kafka-connect-adaptor/pom.xml | 22 ++++++++++
.../io/kafka/connect/KafkaConnectSource.java | 27 +++++++++---
.../resources/META-INF/services/pulsar-io.yaml | 22 ++++++++++
.../resources/debezium-mysql-source-config.yaml | 51 ++++++++++++++++++++++
.../io/kafka/connect/KafkaConnectSourceTest.java | 6 +--
9 files changed, 131 insertions(+), 15 deletions(-)
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index c527fc7..1a190a4 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -93,5 +93,11 @@
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
+
+ <file>
+ <source>${basedir}/../../pulsar-io/kafka-connect-adaptor/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source>
+ <outputDirectory>connectors</outputDirectory>
+ <fileMode>644</fileMode>
+ </file>
</files>
</assembly>
diff --git a/pom.xml b/pom.xml
index eac83c0..ef870ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@ flexible messaging model and an intuitive client API.</description>
<presto.version>0.206</presto.version>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
- <debezium-core.version>0.8.2</debezium-core.version>
+ <debezium.version>0.8.2</debezium.version>
<!-- test dependencies -->
<arquillian-cube.version>1.15.1</arquillian-cube.version>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index fb79196..3a178ab 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -41,13 +41,13 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
- <version>${debezium-core.version}</version>
+ <version>${debezium.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
- <version>${project.version}</version>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ <version>${debezium.version}</version>
</dependency>
<dependency>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index bc97fc6..820c5a4 100644
--- a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -61,7 +61,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
.withValidation(Field::isRequired);
public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
- .withDisplayName("Kafka broker addresses")
+ .withDisplayName("Pulsar broker addresses")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index e0f0574..b2d0eea 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -39,6 +39,12 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-debezium</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka-client.version}</version>
@@ -52,6 +58,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>${kafka-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka-client.version}</version>
</dependency>
@@ -102,4 +114,14 @@
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 61635f8..5aac5dd 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -74,21 +75,25 @@ public class KafkaConnectSource implements Source<byte[]> {
});
// get the source class name from config and create source task from reflection
- sourceTask = ((Class<? extends SourceTask>)config.get(TaskConfig.TASK_CLASS_CONFIG))
+ sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
.asSubclass(SourceTask.class)
.getDeclaredConstructor()
.newInstance();
+
// initialize the key and value converter
- keyConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))
+ keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
.asSubclass(Converter.class)
.getDeclaredConstructor()
.newInstance();
- valueConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG))
+ valueConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
.asSubclass(Converter.class)
.getDeclaredConstructor()
.newInstance();
+ keyConverter.configure(config, true);
+ valueConverter.configure(config, false);
+
offsetStore = new PulsarOffsetBackingStore();
offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig));
offsetStore.start();
@@ -117,7 +122,12 @@ public class KafkaConnectSource implements Source<byte[]> {
while (true) {
if (currentBatch == null) {
flushFuture = new CompletableFuture<>();
- currentBatch = sourceTask.poll().iterator();
+ List<SourceRecord> recordList = sourceTask.poll();
+ if (recordList == null) {
+ Thread.sleep(1000);
+ continue;
+ }
+ currentBatch = recordList.iterator();
}
if (currentBatch.hasNext()) {
return processSourceRecord(currentBatch.next());
@@ -126,7 +136,7 @@ public class KafkaConnectSource implements Source<byte[]> {
synchronized (this) {
hasOutstandingRecords = !outstandingRecords.isEmpty();
}
- if (hasOutstandingRecords) {
+ if (!hasOutstandingRecords) {
// there is no records any more, then waiting for the batch to complete writing
// to sink and the offsets are committed as well
flushFuture.get();
@@ -161,7 +171,7 @@ public class KafkaConnectSource implements Source<byte[]> {
@Override
public Optional<Long> getEventTime() {
- return Optional.of(srcRecord.timestamp());
+ return Optional.ofNullable(srcRecord.timestamp());
}
@Override
@@ -202,6 +212,11 @@ public class KafkaConnectSource implements Source<byte[]> {
flushFuture.completeExceptionally(new Exception("Sink Error"));
}
}
+
+ @Override
+ public Optional<String> getDestinationTopic() {
+ return Optional.of(srcRecord.topic());
+ }
};
}
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..4887063
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: kafka-connect-adaptor
+description: Kafka source connect adaptor
+sourceClass: org.apache.pulsar.io.kafka.connect.KafkaConnectSource
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
new file mode 100644
index 0000000..2e288cb
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "test"
+namespace: "test-namespace"
+name: "debezium-kafka-source"
+
+##autoAck: true
+parallelism: 1
+
+configs:
+ ## sourceTask
+ task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
+
+ ## config for mysql, docker image: debezium/example-mysql:0.8
+ database.hostname: "localhost"
+ database.port: "3306"
+ database.user: "debezium"
+ database.password: "dbz"
+ database.server.id: "184054"
+ database.server.name: "dbserver1"
+ database.whitelist: "inventory"
+
+ database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
+ database.history.pulsar.topic: "history-topic"
+ database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
+ key.converter: "org.apache.kafka.connect.json.JsonConverter"
+ value.converter: "org.apache.kafka.connect.json.JsonConverter"
+ ## PULSAR_SERVICE_URL_CONFIG
+ pulsar.service.url: "pulsar://127.0.0.1:6650"
+ ## OFFSET_STORAGE_TOPIC_CONFIG
+ offset.storage.topic: "offset-topic"
+
+
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index b82daab..d0f0a63 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -70,9 +70,9 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase {
super.internalSetup();
super.producerBaseSetup();
- config.put(TaskConfig.TASK_CLASS_CONFIG, org.apache.kafka.connect.file.FileStreamSourceTask.class);
- config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
- config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
+ config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");
+ config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, brokerUrl.toString());