You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/29 06:41:37 UTC

[GitHub] sijie closed pull request #2791: Debezium: integrate KafkaConnectSource with MySqlConnectorTask

sijie closed pull request #2791: Debezium: integrate KafkaConnectSource with MySqlConnectorTask
URL: https://github.com/apache/pulsar/pull/2791
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index c527fc70ea..1a190a47ca 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -93,5 +93,11 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      <source>${basedir}/../../pulsar-io/kafka-connect-adaptor/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/pom.xml b/pom.xml
index eac83c073f..ef870ab4e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@ flexible messaging model and an intuitive client API.</description>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
-    <debezium-core.version>0.8.2</debezium-core.version>
+    <debezium.version>0.8.2</debezium.version>
 
     <!-- test dependencies -->
     <arquillian-cube.version>1.15.1</arquillian-cube.version>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index fb791960c3..3a178abcd3 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -41,13 +41,13 @@
     <dependency>
       <groupId>io.debezium</groupId>
       <artifactId>debezium-core</artifactId>
-      <version>${debezium-core.version}</version>
+      <version>${debezium.version}</version>
     </dependency>
 
     <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
-      <version>${project.version}</version>
+      <groupId>io.debezium</groupId>
+      <artifactId>debezium-connector-mysql</artifactId>
+      <version>${debezium.version}</version>
     </dependency>
 
     <dependency>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index bc97fc6353..820c5a4eb2 100644
--- a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -61,7 +61,7 @@
         .withValidation(Field::isRequired);
 
     public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
-        .withDisplayName("Kafka broker addresses")
+        .withDisplayName("Pulsar broker addresses")
         .withType(Type.STRING)
         .withWidth(Width.LONG)
         .withImportance(Importance.HIGH)
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index e0f05742a9..b2d0eeab3c 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -38,6 +38,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-debezium</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
@@ -50,6 +56,12 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-json</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>connect-api</artifactId>
@@ -102,4 +114,14 @@
 
   </dependencies>
 
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 61635f80ec..5aac5ddf3c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -74,21 +75,25 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
         });
 
         // get the source class name from config and create source task from reflection
-        sourceTask = ((Class<? extends SourceTask>)config.get(TaskConfig.TASK_CLASS_CONFIG))
+        sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
             .asSubclass(SourceTask.class)
             .getDeclaredConstructor()
             .newInstance();
 
+
         // initialize the key and value converter
-        keyConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))
+        keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
             .asSubclass(Converter.class)
             .getDeclaredConstructor()
             .newInstance();
-        valueConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG))
+        valueConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
             .asSubclass(Converter.class)
             .getDeclaredConstructor()
             .newInstance();
 
+        keyConverter.configure(config, true);
+        valueConverter.configure(config, false);
+
         offsetStore = new PulsarOffsetBackingStore();
         offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig));
         offsetStore.start();
@@ -117,7 +122,12 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
         while (true) {
             if (currentBatch == null) {
                 flushFuture = new CompletableFuture<>();
-                currentBatch = sourceTask.poll().iterator();
+                List<SourceRecord> recordList =  sourceTask.poll();
+                if (recordList == null) {
+                    Thread.sleep(1000);
+                    continue;
+                }
+                currentBatch = recordList.iterator();
             }
             if (currentBatch.hasNext()) {
                 return processSourceRecord(currentBatch.next());
@@ -126,7 +136,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
                 synchronized (this) {
                     hasOutstandingRecords = !outstandingRecords.isEmpty();
                 }
-                if (hasOutstandingRecords) {
+                if (!hasOutstandingRecords) {
                     // there is no records any more, then waiting for the batch to complete writing
                     // to sink and the offsets are committed as well
                     flushFuture.get();
@@ -161,7 +171,7 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
 
             @Override
             public Optional<Long> getEventTime() {
-                return Optional.of(srcRecord.timestamp());
+                return Optional.ofNullable(srcRecord.timestamp());
             }
 
             @Override
@@ -202,6 +212,11 @@ public void fail() {
                     flushFuture.completeExceptionally(new Exception("Sink Error"));
                 }
             }
+
+            @Override
+            public Optional<String> getDestinationTopic() {
+                return Optional.of(srcRecord.topic());
+            }
         };
     }
 
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000000..4887063c1e
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: kafka-connect-adaptor
+description: Kafka source connect adaptor
+sourceClass: org.apache.pulsar.io.kafka.connect.KafkaConnectSource
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
new file mode 100644
index 0000000000..2e288cb63f
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "test"
+namespace: "test-namespace"
+name: "debezium-kafka-source"
+
+##autoAck: true
+parallelism: 1
+
+configs:
+  ## sourceTask
+  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
+
+  ## config for mysql, docker image: debezium/example-mysql:0.8
+  database.hostname: "localhost"
+  database.port: "3306"
+  database.user: "debezium"
+  database.password: "dbz"
+  database.server.id: "184054"
+  database.server.name: "dbserver1"
+  database.whitelist: "inventory"
+
+  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
+  database.history.pulsar.topic: "history-topic"
+  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
+  key.converter: "org.apache.kafka.connect.json.JsonConverter"
+  value.converter: "org.apache.kafka.connect.json.JsonConverter"
+  ## PULSAR_SERVICE_URL_CONFIG
+  pulsar.service.url: "pulsar://127.0.0.1:6650"
+  ## OFFSET_STORAGE_TOPIC_CONFIG
+  offset.storage.topic: "offset-topic"
+
+
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index b82daab233..d0f0a633f4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -70,9 +70,9 @@ protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
 
-        config.put(TaskConfig.TASK_CLASS_CONFIG, org.apache.kafka.connect.file.FileStreamSourceTask.class);
-        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
-        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
+        config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
 
         this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
         config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, brokerUrl.toString());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services