You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/05/28 17:34:42 UTC

[pulsar] branch master updated: Split KafkaConnectSource into an abstract and impl (#6911)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86e39a2  Split KafkaConnectSource into an abstract and impl (#6911)
86e39a2 is described below

commit 86e39a25e961d148a485be4a7c9fc186d5b8f8bc
Author: Nathan Mills <na...@instructure.com>
AuthorDate: Thu May 28 11:34:31 2020 -0600

    Split KafkaConnectSource into an abstract and impl (#6911)
    
    ### Motivation
    
    We do extra processing steps and send back a different Record type than what is provided by KafkaConnectSource, but still desire to reuse a majority of the code. Having an Abstract implementation allows us to reuse the majority of the code while still being able to customize the Record and processRecord method.
    
    ### Modifications
    
    KafkaConnectSource was split into AbstractKafkaConnectSource with KafkaConnectSource extending the abstract. The processRecord method was made public to allow overriding of the method. The inner class KafkaSourceRecord was also split into an abstract implementation to allow different types of Records.
    
    ### Verifying this change
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    The change should only be a splitting of code with no behavioral changes to the KafkaConnectSource class.
---
 ...Source.java => AbstractKafkaConnectSource.java} | 187 ++++----------
 .../io/kafka/connect/KafkaConnectSource.java       | 270 ++-------------------
 2 files changed, 67 insertions(+), 390 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
similarity index 58%
copy from pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
copy to pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index a178b4b..38f6a70 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -18,62 +18,40 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
-import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
-
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.functions.api.KVRecord;
-import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter;
-import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
-import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
-import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.*;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter;
 import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
 import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
 
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
+
 /**
  * A pulsar source that runs
  */
 @Slf4j
-public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
+public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
 
     // kafka connect related variables
     private SourceTaskContext sourceTaskContext;
     @Getter
     private SourceTask sourceTask;
-    private Converter keyConverter;
-    private Converter valueConverter;
+    public Converter keyConverter;
+    public Converter valueConverter;
 
     // pulsar io related variables
     private Iterator<SourceRecord> currentBatch = null;
@@ -82,17 +60,10 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
     private OffsetStorageReader offsetReader;
     private String topicNamespace;
     @Getter
-    private OffsetStorageWriter offsetWriter;
+    public OffsetStorageWriter offsetWriter;
     // number of outstandingRecords that have been polled but not been acked
     private AtomicInteger outstandingRecords = new AtomicInteger(0);
 
-    private boolean jsonWithEnvelope = false;
-    static private final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
-
-    private final Cache<org.apache.kafka.connect.data.Schema, KafkaSchemaWrappedSchema> readerCache =
-            CacheBuilder.newBuilder().maximumSize(10000)
-                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
-
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
         Map<String, String> stringConfig = new HashMap<>();
@@ -102,31 +73,23 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             }
         });
 
-        if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
-            jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
-            config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, jsonWithEnvelope);
-        } else {
-            config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
-        }
-        log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
-
         // get the source class name from config and create source task from reflection
-        sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
-            .asSubclass(SourceTask.class)
-            .getDeclaredConstructor()
-            .newInstance();
+        sourceTask = ((Class<? extends SourceTask>) Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
+                .asSubclass(SourceTask.class)
+                .getDeclaredConstructor()
+                .newInstance();
 
         topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG);
 
         // initialize the key and value converter
-        keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
-            .asSubclass(Converter.class)
-            .getDeclaredConstructor()
-            .newInstance();
-        valueConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
-            .asSubclass(Converter.class)
-            .getDeclaredConstructor()
-            .newInstance();
+        keyConverter = ((Class<? extends Converter>) Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
+                .asSubclass(Converter.class)
+                .getDeclaredConstructor()
+                .newInstance();
+        valueConverter = ((Class<? extends Converter>) Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
+                .asSubclass(Converter.class)
+                .getDeclaredConstructor()
+                .newInstance();
 
         if (keyConverter instanceof AvroConverter) {
             keyConverter = new AvroConverter(new MockSchemaRegistryClient());
@@ -145,16 +108,16 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
         offsetStore.start();
 
         offsetReader = new OffsetStorageReaderImpl(
-            offsetStore,
-            "pulsar-kafka-connect-adaptor",
-            keyConverter,
-            valueConverter
+                offsetStore,
+                "pulsar-kafka-connect-adaptor",
+                keyConverter,
+                valueConverter
         );
         offsetWriter = new OffsetStorageWriter(
-            offsetStore,
-            "pulsar-kafka-connect-adaptor",
-            keyConverter,
-            valueConverter
+                offsetStore,
+                "pulsar-kafka-connect-adaptor",
+                keyConverter,
+                valueConverter
         );
 
         sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);
@@ -164,7 +127,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
     }
 
     @Override
-    public synchronized Record<KeyValue<byte[], byte[]>> read() throws Exception {
+    public synchronized Record<T> read() throws Exception {
         while (true) {
             if (currentBatch == null) {
                 flushFuture = new CompletableFuture<>();
@@ -177,8 +140,8 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
                 currentBatch = recordList.iterator();
             }
             if (currentBatch.hasNext()) {
-                Record<KeyValue<byte[], byte[]>> processRecord = processSourceRecord(currentBatch.next());
-                if (processRecord.getValue().getValue() == null) {
+                AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next());
+                if (processRecord.isEmpty()) {
                     outstandingRecords.decrementAndGet();
                     continue;
                 } else {
@@ -201,21 +164,17 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
         }
     }
 
-    private synchronized Record<KeyValue<byte[], byte[]>> processSourceRecord(final SourceRecord srcRecord) {
-        KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
-        offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
-        return record;
-    }
+    public abstract AbstractKafkaSourceRecord<T> processSourceRecord(final SourceRecord srcRecord);
 
     private static Map<String, String> PROPERTIES = Collections.emptyMap();
     private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
     private static long FLUSH_TIMEOUT_MS = 2000;
 
-    private class KafkaSourceRecord implements KVRecord<byte[], byte[]> {
+    public abstract class AbstractKafkaSourceRecord<T> implements Record {
         @Getter
         Optional<String> key;
         @Getter
-        KeyValue<byte[], byte[]> value;
+        T value;
         @Getter
         Optional<String> topicName;
         @Getter
@@ -229,75 +188,11 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
 
         KafkaSchemaWrappedSchema valueSchema;
 
-        KafkaSourceRecord(SourceRecord srcRecord) {
-            AvroData avroData = new AvroData(1000);
-            byte[] keyBytes = keyConverter.fromConnectData(
-                    srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
-            this.key = keyBytes != null ? Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
-
-            byte[] valueBytes = valueConverter.fromConnectData(
-                    srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
-
-            this.value = new KeyValue<>(keyBytes, valueBytes);
-
-            this.topicName = Optional.of(srcRecord.topic());
-
-            if (srcRecord.keySchema() != null) {
-                keySchema = readerCache.getIfPresent(srcRecord.keySchema());
-            }
-            if (srcRecord.valueSchema() != null) {
-                valueSchema = readerCache.getIfPresent(srcRecord.valueSchema());
-            }
-
-            if (srcRecord.keySchema() != null && keySchema == null) {
-                keySchema = new KafkaSchemaWrappedSchema(
-                        avroData.fromConnectSchema(srcRecord.keySchema()), keyConverter);
-                readerCache.put(srcRecord.keySchema(), keySchema);
-            }
-
-            if (srcRecord.valueSchema() != null && valueSchema == null) {
-                valueSchema = new KafkaSchemaWrappedSchema(
-                        avroData.fromConnectSchema(srcRecord.valueSchema()), valueConverter);
-                readerCache.put(srcRecord.valueSchema(), valueSchema);
-            }
-
-            this.eventTime = Optional.ofNullable(srcRecord.timestamp());
-            this.partitionId = Optional.of(srcRecord.sourcePartition()
-                .entrySet()
-                .stream()
-                .map(e -> e.getKey() + "=" + e.getValue())
-                .collect(Collectors.joining(",")));
+        AbstractKafkaSourceRecord(SourceRecord srcRecord) {
             this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic());
         }
 
         @Override
-        public Schema<byte[]> getKeySchema() {
-            if (jsonWithEnvelope || keySchema == null) {
-                return Schema.BYTES;
-            } else {
-                return keySchema;
-            }
-        }
-
-        @Override
-        public Schema<byte[]> getValueSchema() {
-            if (jsonWithEnvelope || valueSchema == null) {
-                return Schema.BYTES;
-            } else {
-                return valueSchema;
-            }
-        }
-
-        @Override
-        public KeyValueEncodingType getKeyValueEncodingType() {
-            if (jsonWithEnvelope) {
-                return KeyValueEncodingType.INLINE;
-            } else {
-                return KeyValueEncodingType.SEPARATED;
-            }
-        }
-
-        @Override
         public Schema getSchema() {
             return null;
         }
@@ -312,6 +207,10 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             return PROPERTIES;
         }
 
+        public boolean isEmpty() {
+            return this.value == null;
+        }
+
         private void completedFlushOffset(Throwable error, Void result) {
             if (error != null) {
                 log.error("Failed to flush offsets to storage: ", error);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index a178b4b..47214ae 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -18,90 +18,40 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
-import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG;
-
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.functions.api.KVRecord;
-import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter;
-import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.connect.source.SourceTaskContext;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
-import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
-import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
-import org.apache.pulsar.kafka.shade.io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import org.apache.pulsar.kafka.shade.io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
+
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * A pulsar source that runs
  */
 @Slf4j
-public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
-
-    // kafka connect related variables
-    private SourceTaskContext sourceTaskContext;
-    @Getter
-    private SourceTask sourceTask;
-    private Converter keyConverter;
-    private Converter valueConverter;
-
-    // pulsar io related variables
-    private Iterator<SourceRecord> currentBatch = null;
-    private CompletableFuture<Void> flushFuture;
-    private OffsetBackingStore offsetStore;
-    private OffsetStorageReader offsetReader;
-    private String topicNamespace;
-    @Getter
-    private OffsetStorageWriter offsetWriter;
-    // number of outstandingRecords that have been polled but not been acked
-    private AtomicInteger outstandingRecords = new AtomicInteger(0);
-
-    private boolean jsonWithEnvelope = false;
-    static private final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
+public class KafkaConnectSource extends AbstractKafkaConnectSource<KeyValue<byte[], byte[]>> {
 
     private final Cache<org.apache.kafka.connect.data.Schema, KafkaSchemaWrappedSchema> readerCache =
             CacheBuilder.newBuilder().maximumSize(10000)
                     .expireAfterAccess(30, TimeUnit.MINUTES).build();
 
-    @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-        Map<String, String> stringConfig = new HashMap<>();
-        config.forEach((key, value) -> {
-            if (value instanceof String) {
-                stringConfig.put(key, (String) value);
-            }
-        });
+    private boolean jsonWithEnvelope = false;
+    static private final String JSON_WITH_ENVELOPE_CONFIG = "json-with-envelope";
 
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
         if (config.get(JSON_WITH_ENVELOPE_CONFIG) != null) {
             jsonWithEnvelope = Boolean.parseBoolean(config.get(JSON_WITH_ENVELOPE_CONFIG).toString());
             config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, jsonWithEnvelope);
@@ -109,99 +59,11 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
         }
         log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
-
-        // get the source class name from config and create source task from reflection
-        sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)))
-            .asSubclass(SourceTask.class)
-            .getDeclaredConstructor()
-            .newInstance();
-
-        topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG);
-
-        // initialize the key and value converter
-        keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
-            .asSubclass(Converter.class)
-            .getDeclaredConstructor()
-            .newInstance();
-        valueConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)))
-            .asSubclass(Converter.class)
-            .getDeclaredConstructor()
-            .newInstance();
-
-        if (keyConverter instanceof AvroConverter) {
-            keyConverter = new AvroConverter(new MockSchemaRegistryClient());
-            config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
-        }
-        if (valueConverter instanceof AvroConverter) {
-            valueConverter = new AvroConverter(new MockSchemaRegistryClient());
-            config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock");
-        }
-        keyConverter.configure(config, true);
-        valueConverter.configure(config, false);
-
-        offsetStore = new PulsarOffsetBackingStore();
-        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
-        offsetStore.configure(pulsarKafkaWorkerConfig);
-        offsetStore.start();
-
-        offsetReader = new OffsetStorageReaderImpl(
-            offsetStore,
-            "pulsar-kafka-connect-adaptor",
-            keyConverter,
-            valueConverter
-        );
-        offsetWriter = new OffsetStorageWriter(
-            offsetStore,
-            "pulsar-kafka-connect-adaptor",
-            keyConverter,
-            valueConverter
-        );
-
-        sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);
-
-        sourceTask.initialize(sourceTaskContext);
-        sourceTask.start(stringConfig);
-    }
-
-    @Override
-    public synchronized Record<KeyValue<byte[], byte[]>> read() throws Exception {
-        while (true) {
-            if (currentBatch == null) {
-                flushFuture = new CompletableFuture<>();
-                List<SourceRecord> recordList = sourceTask.poll();
-                if (recordList == null || recordList.isEmpty()) {
-                    Thread.sleep(1000);
-                    continue;
-                }
-                outstandingRecords.addAndGet(recordList.size());
-                currentBatch = recordList.iterator();
-            }
-            if (currentBatch.hasNext()) {
-                Record<KeyValue<byte[], byte[]>> processRecord = processSourceRecord(currentBatch.next());
-                if (processRecord.getValue().getValue() == null) {
-                    outstandingRecords.decrementAndGet();
-                    continue;
-                } else {
-                    return processRecord;
-                }
-            } else {
-                // there is no records any more, then waiting for the batch to complete writing
-                // to sink and the offsets are committed as well, then do next round read.
-                flushFuture.get();
-                flushFuture = null;
-                currentBatch = null;
-            }
-        }
+        super.open(config, sourceContext);
     }
 
-    @Override
-    public void close() {
-        if (sourceTask != null) {
-            sourceTask.stop();
-        }
-    }
 
-    private synchronized Record<KeyValue<byte[], byte[]>> processSourceRecord(final SourceRecord srcRecord) {
+    public synchronized KafkaSourceRecord processSourceRecord(final SourceRecord srcRecord) {
         KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
         offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
         return record;
@@ -211,25 +73,10 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
     private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
     private static long FLUSH_TIMEOUT_MS = 2000;
 
-    private class KafkaSourceRecord implements KVRecord<byte[], byte[]> {
-        @Getter
-        Optional<String> key;
-        @Getter
-        KeyValue<byte[], byte[]> value;
-        @Getter
-        Optional<String> topicName;
-        @Getter
-        Optional<Long> eventTime;
-        @Getter
-        Optional<String> partitionId;
-        @Getter
-        Optional<String> destinationTopic;
-
-        KafkaSchemaWrappedSchema keySchema;
-
-        KafkaSchemaWrappedSchema valueSchema;
+    private class KafkaSourceRecord extends AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>> implements KVRecord<byte[], byte[]> {
 
         KafkaSourceRecord(SourceRecord srcRecord) {
+            super(srcRecord);
             AvroData avroData = new AvroData(1000);
             byte[] keyBytes = keyConverter.fromConnectData(
                     srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
@@ -267,7 +114,11 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
                 .stream()
                 .map(e -> e.getKey() + "=" + e.getValue())
                 .collect(Collectors.joining(",")));
-            this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic());
+        }
+
+        @Override
+        public boolean isEmpty(){
+            return this.value.getValue() == null;
         }
 
         @Override
@@ -297,79 +148,6 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             }
         }
 
-        @Override
-        public Schema getSchema() {
-            return null;
-        }
-
-        @Override
-        public Optional<Long> getRecordSequence() {
-            return RECORD_SEQUENCE;
-        }
-
-        @Override
-        public Map<String, String> getProperties() {
-            return PROPERTIES;
-        }
-
-        private void completedFlushOffset(Throwable error, Void result) {
-            if (error != null) {
-                log.error("Failed to flush offsets to storage: ", error);
-                currentBatch = null;
-                offsetWriter.cancelFlush();
-                flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
-            } else {
-                log.trace("Finished flushing offsets to storage");
-                currentBatch = null;
-                flushFuture.complete(null);
-            }
-        }
-
-        @Override
-        public void ack() {
-            // TODO: should flush for each batch. not wait for a time for acked all.
-            // How to handle order between each batch. QueueList<pair<batch, automicInt>>. check if head is all acked.
-            boolean canFlush = (outstandingRecords.decrementAndGet() == 0);
-
-            // consumed all the records, flush the offsets
-            if (canFlush && flushFuture != null) {
-                if (!offsetWriter.beginFlush()) {
-                    log.error("When beginFlush, No offsets to commit!");
-                    flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
-                    return;
-                }
-
-                Future<Void> doFlush = offsetWriter.doFlush(this::completedFlushOffset);
-                if (doFlush == null) {
-                    // Offsets added in processSourceRecord, But here no offsets to commit
-                    log.error("No offsets to commit!");
-                    flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
-                    return;
-                }
-
-                // Wait until the offsets are flushed
-                try {
-                    doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-                    sourceTask.commit();
-                } catch (InterruptedException e) {
-                    log.warn("Flush of {} offsets interrupted, cancelling", this);
-                    offsetWriter.cancelFlush();
-                } catch (ExecutionException e) {
-                    log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
-                    offsetWriter.cancelFlush();
-                } catch (TimeoutException e) {
-                    log.error("Timed out waiting to flush {} offsets to storage", this);
-                    offsetWriter.cancelFlush();
-                }
-            }
-        }
-
-        @Override
-        public void fail() {
-            if (flushFuture != null) {
-                flushFuture.completeExceptionally(new Exception("Sink Error"));
-            }
-        }
     }
 
 }