You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/03 17:46:38 UTC

[pulsar] branch master updated: add kafka source connect adaptor for debezium (#2705)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d075d2  add kafka source connect adaptor for debezium (#2705)
8d075d2 is described below

commit 8d075d25e87a0fca1298a8e662e1e22be9423059
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Thu Oct 4 01:46:32 2018 +0800

    add kafka source connect adaptor for debezium (#2705)
    
    ### Motivation
    
    add kafka source connect adaptor for debezium
    This will save data from kafka source connect into Pulsar.
    
    ### Modifications
    
    add class and test
    
    ### Result
    
    ut pass
---
 pulsar-io/debezium/pom.xml                         |   6 +
 .../{debezium => kafka-connect-adaptor}/pom.xml    |  33 ++--
 .../io/kafka/connect/KafkaConnectSource.java       | 213 +++++++++++++++++++++
 .../kafka/connect/PulsarIOSourceTaskContext.java   |  36 ++++
 .../io/kafka/connect}/PulsarKafkaWorkerConfig.java |   2 +-
 .../kafka/connect}/PulsarOffsetBackingStore.java   |   4 +-
 .../io/kafka/connect/KafkaConnectSourceTest.java   | 126 ++++++++++++
 .../connect}/PulsarOffsetBackingStoreTest.java     |   2 +-
 pulsar-io/pom.xml                                  |   1 +
 9 files changed, 401 insertions(+), 22 deletions(-)

diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 52e589c..eaa1500 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -45,6 +45,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
       <version>${kafka-client.version}</version>
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
similarity index 83%
copy from pulsar-io/debezium/pom.xml
copy to pulsar-io/kafka-connect-adaptor/pom.xml
index 52e589c..9d873f6 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -19,7 +19,7 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
@@ -27,8 +27,8 @@
     <version>2.2.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pulsar-io-debezium</artifactId>
-  <name>Pulsar IO :: Debezium</name>
+  <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
+  <name>Pulsar IO :: Kafka Connect Adaptor</name>
 
   <dependencies>
 
@@ -39,20 +39,20 @@
     </dependency>
 
     <dependency>
-      <groupId>io.debezium</groupId>
-      <artifactId>debezium-core</artifactId>
-      <version>${debezium-core.version}</version>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>${kafka-client.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <artifactId>connect-runtime</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>connect-runtime</artifactId>
+      <artifactId>connect-api</artifactId>
       <version>${kafka-client.version}</version>
     </dependency>
 
@@ -70,6 +70,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-file</artifactId>
+      <version>${kafka-client.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.version}</version>
@@ -95,14 +102,4 @@
 
   </dependencies>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-maven-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-
 </project>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
new file mode 100644
index 0000000..61635f8
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A pulsar source that runs
+ */
+@Slf4j
+public class KafkaConnectSource implements Source<byte[]> {
+
+    // kafka connect related variables
+    private SourceTaskContext sourceTaskContext;
+    @Getter
+    private SourceTask sourceTask;
+    private Converter keyConverter;
+    private Converter valueConverter;
+
+    // pulsar io related variables
+    private Iterator<SourceRecord> currentBatch = null;
+    private CompletableFuture<Void> flushFuture;
+    private OffsetBackingStore offsetStore;
+    private OffsetStorageReader offsetReader;
+    @Getter
+    private OffsetStorageWriter offsetWriter;
+    private IdentityHashMap<SourceRecord, SourceRecord> outstandingRecords = new IdentityHashMap<>();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        Map<String, String> stringConfig = new HashMap<>();
+        config.forEach((key, value) -> {
+            if (value instanceof String) {
+                stringConfig.put(key, (String) value);
+            }
+        });
+
+        // get the source class name from config and create source task from reflection
+        sourceTask = ((Class<? extends SourceTask>)config.get(TaskConfig.TASK_CLASS_CONFIG))
+            .asSubclass(SourceTask.class)
+            .getDeclaredConstructor()
+            .newInstance();
+
+        // initialize the key and value converter
+        keyConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))
+            .asSubclass(Converter.class)
+            .getDeclaredConstructor()
+            .newInstance();
+        valueConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG))
+            .asSubclass(Converter.class)
+            .getDeclaredConstructor()
+            .newInstance();
+
+        offsetStore = new PulsarOffsetBackingStore();
+        offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig));
+        offsetStore.start();
+
+        offsetReader = new OffsetStorageReaderImpl(
+            offsetStore,
+            "pulsar-kafka-connect-adaptor",
+            keyConverter,
+            valueConverter
+        );
+        offsetWriter = new OffsetStorageWriter(
+            offsetStore,
+            "pulsar-kafka-connect-adaptor",
+            keyConverter,
+            valueConverter
+        );
+
+        sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader);
+
+        sourceTask.initialize(sourceTaskContext);
+        sourceTask.start(stringConfig);
+    }
+
+    @Override
+    public Record<byte[]> read() throws Exception {
+        while (true) {
+            if (currentBatch == null) {
+                flushFuture = new CompletableFuture<>();
+                currentBatch = sourceTask.poll().iterator();
+            }
+            if (currentBatch.hasNext()) {
+                return processSourceRecord(currentBatch.next());
+            } else {
+                boolean hasOutstandingRecords;
+                synchronized (this) {
+                    hasOutstandingRecords = !outstandingRecords.isEmpty();
+                }
+                if (hasOutstandingRecords) {
+                    // there is no records any more, then waiting for the batch to complete writing
+                    // to sink and the offsets are committed as well
+                    flushFuture.get();
+                    flushFuture = null;
+                }
+                currentBatch = null;
+            }
+        }
+    }
+
+    private synchronized Record<byte[]> processSourceRecord(final SourceRecord srcRecord) {
+        outstandingRecords.put(srcRecord, srcRecord);
+        offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
+        return new Record<byte[]>() {
+            @Override
+            public Optional<String> getKey() {
+                byte[] keyBytes = keyConverter.fromConnectData(
+                    srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
+                return Optional.of(Base64.getEncoder().encodeToString(keyBytes));
+            }
+
+            @Override
+            public byte[] getValue() {
+                return valueConverter.fromConnectData(
+                    srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
+            }
+
+            @Override
+            public Optional<String> getTopicName() {
+                return Optional.of(srcRecord.topic());
+            }
+
+            @Override
+            public Optional<Long> getEventTime() {
+                return Optional.of(srcRecord.timestamp());
+            }
+
+            @Override
+            public Optional<String> getPartitionId() {
+                String partitionId = srcRecord.sourcePartition()
+                    .entrySet()
+                    .stream()
+                    .map(e -> e.getKey() + "=" + e.getValue())
+                    .collect(Collectors.joining(","));
+                return Optional.of(partitionId);
+            }
+
+            @Override
+            public Optional<Long> getRecordSequence() {
+                return Optional.empty();
+            }
+
+            @Override
+            public Map<String, String> getProperties() {
+                return Collections.emptyMap();
+            }
+
+            @Override
+            public void ack() {
+                boolean canComplete;
+                synchronized (KafkaConnectSource.this) {
+                    outstandingRecords.remove(srcRecord);
+                    canComplete = outstandingRecords.isEmpty();
+                }
+                if (canComplete && flushFuture != null) {
+                    flushFuture.complete(null);
+                }
+            }
+
+            @Override
+            public void fail() {
+                if (flushFuture != null) {
+                    flushFuture.completeExceptionally(new Exception("Sink Error"));
+                }
+            }
+        };
+    }
+
+
+    @Override
+    public void close() throws Exception {
+        sourceTask.stop();
+    }
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
new file mode 100644
index 0000000..561a090
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+class PulsarIOSourceTaskContext implements SourceTaskContext {
+
+    private final OffsetStorageReader reader;
+
+    PulsarIOSourceTaskContext(OffsetStorageReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+        return reader;
+    }
+}
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarKafkaWorkerConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
similarity index 97%
rename from pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarKafkaWorkerConfig.java
rename to pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
index a4f65e2..d00f776 100644
--- a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarKafkaWorkerConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.debezium;
+package org.apache.pulsar.io.kafka.connect;
 
 import java.util.Map;
 import org.apache.kafka.common.config.ConfigDef;
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
similarity index 99%
rename from pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStore.java
rename to pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 80d37bd..d7daf82 100644
--- a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.debezium;
+package org.apache.pulsar.io.kafka.connect;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -66,6 +66,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
         checkArgument(!isBlank(serviceUrl), "Pulsar service url must be specified at `"
             + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`");
         this.data = new HashMap<>();
+
         log.info("Configure offset backing store on pulsar topic {} at cluster {}",
             topic, serviceUrl);
     }
@@ -222,7 +223,6 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
                 .value(valBytes)
                 .sendAsync();
         });
-
         return producer.flushAsync().whenComplete((ignored, cause) -> {
             if (null != callback) {
                 callback.onCompletion(cause, ignored);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
new file mode 100644
index 0000000..b82daab
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.file.FileStreamSourceTask;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.functions.api.Record;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the implementation of {@link KafkaConnectSource}.
+ */
+@Slf4j
+public class KafkaConnectSourceTest extends ProducerConsumerBase  {
+
+    private Map<String, Object> config = new HashMap<>();
+    private String offsetTopicName;
+    // The topic to publish data to, for kafkaSource
+    private String topicName;
+    private KafkaConnectSource kafkaConnectSource;
+    private File tempFile;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        config.put(TaskConfig.TASK_CLASS_CONFIG, org.apache.kafka.connect.file.FileStreamSourceTask.class);
+        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
+        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class);
+
+        this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
+        config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, brokerUrl.toString());
+        config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName);
+
+        this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
+        config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
+        tempFile = File.createTempFile("some-file-name", null);
+        config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
+
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        tempFile.delete();
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testOpenAndRead() throws Exception {
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, null);
+
+        OutputStream os = Files.newOutputStream(tempFile.toPath());
+        String line1 = "This is the first line\n";
+        os.write(line1.getBytes());
+        os.flush();
+        log.info("write 2 lines.");
+        kafkaConnectSource.getOffsetWriter().offset(
+            Collections.singletonMap(FILENAME_FIELD, config.get(FileStreamSourceConnector.FILE_CONFIG).toString()),
+            Collections.singletonMap(POSITION_FIELD, /*tempFile.getTotalSpace()*/0L));
+
+        // offset of second line
+        long offset = tempFile.getTotalSpace();
+        String line2 = "This is the second line\n";
+        os.write(line2.getBytes());
+        os.flush();
+        kafkaConnectSource.getOffsetWriter().offset(
+            Collections.singletonMap(FILENAME_FIELD, config.get(FileStreamSourceConnector.FILE_CONFIG).toString()),
+            Collections.singletonMap(POSITION_FIELD, offset));
+
+        log.info("finish write, will read 2 lines");
+
+        Record<byte[]> record = kafkaConnectSource.read();
+        assertTrue(line1.contains(new String(record.getValue())));
+
+        record = kafkaConnectSource.read();
+        assertTrue(line2.contains(new String(record.getValue())));
+    }
+}
diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStoreTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
similarity index 99%
rename from pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStoreTest.java
rename to pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index 4649061..602c73d 100644
--- a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStoreTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.debezium;
+package org.apache.pulsar.io.kafka.connect;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 5956d62..2fc4f9a 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -43,6 +43,7 @@
     <module>jdbc</module>
     <module>data-genenator</module>
     <module>elastic-search</module>
+    <module>kafka-connect-adaptor</module>
     <module>debezium</module>
   </modules>