You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/29 06:41:40 UTC

[pulsar] branch master updated: Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d13fe5b  Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791)
d13fe5b is described below

commit d13fe5bc287d2bc082da1682703d01dc92140a3b
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Mon Oct 29 14:41:36 2018 +0800

    Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791)
    
    ### Motivation
    This change is to integrate KafkaConnectSource with MySqlConnectorTask.
    Steps to run this is like this:
    1, use image in debezium tutorial, start mysql server
    ```docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8```
    
    2, start pulsar standalone
    ```bin/pulsar standalone  ```
    
    3,  start consume data from connect source target topic, After step 4 is start, it should get data.
    ```bin/pulsar-client consume -s "my-sub-name" kafka-connect-topic -n 0```
    
    4, start source connect, It should produce data into Pulsar topics.
     ```bin/pulsar-admin source localrun --tenant public --namespace default --name kafka  --destination-topic-name  kafka-connect-topic --sourceConfigFile  debezium-mysql-source-config.yaml  --archive connectors/pulsar-io-kafka-connect-adaptor-2.2.0-SNAPSHOT.nar --brokerServiceUrl pulsar://127.0.0.1:6650```
---
 distribution/io/src/assemble/io.xml                |  6 +++
 pom.xml                                            |  2 +-
 pulsar-io/debezium/pom.xml                         |  8 ++--
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  |  2 +-
 pulsar-io/kafka-connect-adaptor/pom.xml            | 22 ++++++++++
 .../io/kafka/connect/KafkaConnectSource.java       | 27 +++++++++---
 .../resources/META-INF/services/pulsar-io.yaml     | 22 ++++++++++
 .../resources/debezium-mysql-source-config.yaml    | 51 ++++++++++++++++++++++
 .../io/kafka/connect/KafkaConnectSourceTest.java   |  6 +--
 9 files changed, 131 insertions(+), 15 deletions(-)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index c527fc7..1a190a4 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -93,5 +93,11 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      <source>${basedir}/../../pulsar-io/kafka-connect-adaptor/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/pom.xml b/pom.xml
index eac83c0..ef870ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@ flexible messaging model and an intuitive client API.</description>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
-    <debezium-core.version>0.8.2</debezium-core.version>
+    <debezium.version>0.8.2</debezium.version>
 
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index fb79196..3a178ab 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -41,13 +41,13 @@
     <dependency>
       <groupId>io.debezium</groupId>
       <artifactId>debezium-core</artifactId>
-      <version>${debezium-core.version}</version>
+      <version>${debezium.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
-      <version>${project.version}</version>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-connector-mysql</artifactId>
+      <version>${debezium.version}</version>
     </dependency>
 
     <dependency>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index bc97fc6..820c5a4 100644
--- a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -61,7 +61,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
         .withValidation(Field::isRequired);
 
     public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
-        .withDisplayName("Kafka broker addresses")
+        .withDisplayName("Pulsar broker addresses")
         .withType(Type.STRING)
         .withWidth(Width.LONG)
         .withImportance(Importance.HIGH)
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index e0f0574..b2d0eea 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -39,6 +39,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-debezium</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
       <version>${kafka-client.version}</version>
@@ -52,6 +58,12 @@
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-json</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
       <artifactId>connect-api</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
@@ -102,4 +114,14 @@
 
   </dependencies>
 
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 61635f8..5aac5dd 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -74,21 +75,25 @@ public class KafkaConnectSource implements Source<byte[]> {
         });
 
         // get the source class name from config and create source task from reflection
-        sourceTask = ((Class<? extends SourceTask>)config.get(TaskConfig.TASK_CLASS_CONFIG))
+        sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
             .asSubclass(SourceTask.class)
             .getDeclaredConstructor()
             .newInstance();
 
+
         // initialize the key and value converter
-        keyConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))
+        keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
             .asSubclass(Converter.class)
             .getDeclaredConstructor()
             .newInstance();
-        valueConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG))
+        valueConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
             .asSubclass(Converter.class)
             .getDeclaredConstructor()
             .newInstance();
 
+        keyConverter.configure(config, true);
+        valueConverter.configure(config, false);
+
         offsetStore = new PulsarOffsetBackingStore();
         offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig));
         offsetStore.start();
@@ -117,7 +122,12 @@ public class KafkaConnectSource implements Source<byte[]> {
         while (true) {
             if (currentBatch == null) {
                 flushFuture = new CompletableFuture<>();
-                currentBatch = sourceTask.poll().iterator();
+                List<SourceRecord> recordList =  sourceTask.poll();
+                if (recordList == null) {
+                    Thread.sleep(1000);
+                    continue;
+                }
+                currentBatch = recordList.iterator();
             }
             if (currentBatch.hasNext()) {
                 return processSourceRecord(currentBatch.next());
@@ -126,7 +136,7 @@ public class KafkaConnectSource implements Source<byte[]> {
                 synchronized (this) {
                     hasOutstandingRecords = !outstandingRecords.isEmpty();
                 }
-                if (hasOutstandingRecords) {
+                if (!hasOutstandingRecords) {
                     // there is no records any more, then waiting for the batch to complete writing
                     // to sink and the offsets are committed as well
                     flushFuture.get();
@@ -161,7 +171,7 @@ public class KafkaConnectSource implements Source<byte[]> {
 
             @Override
             public Optional<Long> getEventTime() {
-                return Optional.of(srcRecord.timestamp());
+                return Optional.ofNullable(srcRecord.timestamp());
             }
 
             @Override
@@ -202,6 +212,11 @@ public class KafkaConnectSource implements Source<byte[]> {
                     flushFuture.completeExceptionally(new Exception("Sink Error"));
                 }
             }
+
+            @Override
+            public Optional<String> getDestinationTopic() {
+                return Optional.of(srcRecord.topic());
+            }
         };
     }
 
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..4887063
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: kafka-connect-adaptor
+description: Kafka source connect adaptor
+sourceClass: org.apache.pulsar.io.kafka.connect.KafkaConnectSource
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
new file mode 100644
index 0000000..2e288cb
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "test"
+namespace: "test-namespace"
+name: "debezium-kafka-source"
+
+##autoAck: true
+parallelism: 1
+
+configs:
+  ## sourceTask
+  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
+
+  ## config for mysql, docker image: debezium/example-mysql:0.8
+  database.hostname: "localhost"
+  database.port: "3306"
+  database.user: "debezium"
+  database.password: "dbz"
+  database.server.id: "184054"
+  database.server.name: "dbserver1"
+  database.whitelist: "inventory"
+
+  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
+  database.history.pulsar.topic: "history-topic"
+  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
+  key.converter: "org.apache.kafka.connect.json.JsonConverter"
+  value.converter: "org.apache.kafka.connect.json.JsonConverter"
+  ## PULSAR_SERVICE_URL_CONFIG
+  pulsar.service.url: "pulsar://127.0.0.1:6650"
+  ## OFFSET_STORAGE_TOPIC_CONFIG
+  offset.storage.topic: "offset-topic"
+
+
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index b82daab..d0f0a63 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -70,9 +70,9 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase  {
         super.internalSetup();
         super.producerBaseSetup();
 
-        config.put(TaskConfig.TASK_CLASS_CONFIG, org.apache.kafka.connect.file.FileStreamSourceTask.class);
-        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
-        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
+        config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
 
         this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
         config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, brokerUrl.toString());