You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2021/12/26 11:43:16 UTC

[flink] branch release-1.14 updated: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deserializer

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new ebbf772  [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deserializer
ebbf772 is described below

commit ebbf772ea287ee987f5eb628ad2e395895b312aa
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Dec 2 08:51:26 2021 +0800

    [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deserializer
---
 .../flink/connector/kafka/source/KafkaSource.java  |  20 +--
 .../source/reader/KafkaPartitionSplitReader.java   | 186 +++++++--------------
 .../kafka/source/reader/KafkaRecordEmitter.java    |  50 +++++-
 .../kafka/source/reader/KafkaSourceReader.java     |  14 +-
 .../reader/fetcher/KafkaSourceFetcherManager.java  |  21 ++-
 .../connector/kafka/source/KafkaSourceITCase.java  |  65 ++++---
 .../reader/KafkaPartitionSplitReaderTest.java      |  53 +++---
 7 files changed, 201 insertions(+), 208 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 6df7d2f..400e803 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -49,6 +48,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.UserCodeClassLoader;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -131,8 +132,8 @@ public class KafkaSource<OUT>
     SourceReader<OUT, KafkaPartitionSplit> createReader(
             SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
             throws Exception {
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
-                new FutureCompletingBlockingQueue<>();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
+                elementsQueue = new FutureCompletingBlockingQueue<>();
         deserializationSchema.open(
                 new DeserializationSchema.InitializationContext() {
                     @Override
@@ -148,18 +149,13 @@ public class KafkaSource<OUT>
         final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
                 new KafkaSourceReaderMetrics(readerContext.metricGroup());
 
-        Supplier<KafkaPartitionSplitReader<OUT>> splitReaderSupplier =
-                () ->
-                        new KafkaPartitionSplitReader<>(
-                                props,
-                                deserializationSchema,
-                                readerContext,
-                                kafkaSourceReaderMetrics);
-        KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>();
+        Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
+                () -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
+        KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
 
         return new KafkaSourceReader<>(
                 elementsQueue,
-                new KafkaSourceFetcherManager<>(
+                new KafkaSourceFetcherManager(
                         elementsQueue, splitReaderSupplier::get, splitFinishedHook),
                 recordEmitter,
                 toConfiguration(props),
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index d048230..ebadef3 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -19,16 +19,13 @@
 package org.apache.flink.connector.kafka.source.reader;
 
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
@@ -59,22 +56,14 @@ import java.util.Set;
 import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
-/**
- * A {@link SplitReader} implementation that reads records from Kafka partitions.
- *
- * <p>The returned type are in the format of {@code tuple3(record, offset and timestamp}.
- *
- * @param <T> the type of the record to be emitted from the Source.
- */
-public class KafkaPartitionSplitReader<T>
-        implements SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
+/** A {@link SplitReader} implementation that reads records from Kafka partitions. */
+public class KafkaPartitionSplitReader
+        implements SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
     private static final long POLL_TIMEOUT = 10000L;
 
     private final KafkaConsumer<byte[], byte[]> consumer;
-    private final KafkaRecordDeserializationSchema<T> deserializationSchema;
     private final Map<TopicPartition, Long> stoppingOffsets;
-    private final SimpleCollector<T> collector;
     private final String groupId;
     private final int subtaskId;
 
@@ -85,7 +74,6 @@ public class KafkaPartitionSplitReader<T>
 
     public KafkaPartitionSplitReader(
             Properties props,
-            KafkaRecordDeserializationSchema<T> deserializationSchema,
             SourceReaderContext context,
             KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
         this.subtaskId = context.getIndexOfSubtask();
@@ -95,8 +83,6 @@ public class KafkaPartitionSplitReader<T>
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
         this.consumer = new KafkaConsumer<>(consumerProps);
         this.stoppingOffsets = new HashMap<>();
-        this.deserializationSchema = deserializationSchema;
-        this.collector = new SimpleCollector<>();
         this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
 
         // Metric registration
@@ -105,84 +91,39 @@ public class KafkaPartitionSplitReader<T>
     }
 
     @Override
-    public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
-        KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits =
-                new KafkaPartitionSplitRecords<>();
+    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
         ConsumerRecords<byte[], byte[]> consumerRecords;
         try {
             consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
         } catch (WakeupException we) {
-            recordsBySplits.prepareForRead();
-            return recordsBySplits;
+            return new KafkaPartitionSplitRecords(
+                    ConsumerRecords.empty(), kafkaSourceReaderMetrics);
         }
-
+        KafkaPartitionSplitRecords recordsBySplits =
+                new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics);
         List<TopicPartition> finishedPartitions = new ArrayList<>();
         for (TopicPartition tp : consumerRecords.partitions()) {
             long stoppingOffset = getStoppingOffset(tp);
-            String splitId = tp.toString();
-            Collection<Tuple3<T, Long, Long>> recordsForSplit =
-                    recordsBySplits.recordsForSplit(splitId);
             final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
                     consumerRecords.records(tp);
-            for (ConsumerRecord<byte[], byte[]> consumerRecord : recordsFromPartition) {
-                // Stop consuming from this partition if the offsets has reached the stopping
-                // offset.
-                // Note that there are two cases, either case finishes a split:
-                // 1. After processing a record with offset of "stoppingOffset - 1". The split
-                // reader
-                //    should not continue fetching because the record with stoppingOffset may not
-                // exist.
-                // 2. Before processing a record whose offset is greater than or equals to the
-                // stopping
-                //    offset. This should only happens when case 1 was not met due to log compaction
-                // or
-                //    log retention.
-                // Case 2 is handled here. Case 1 is handled after the record is processed.
-                if (consumerRecord.offset() >= stoppingOffset) {
+
+            if (recordsFromPartition.size() > 0) {
+                final ConsumerRecord<byte[], byte[]> lastRecord =
+                        recordsFromPartition.get(recordsFromPartition.size() - 1);
+
+                // After processing a record with offset of "stoppingOffset - 1", the split reader
+                // should not continue fetching because the record with stoppingOffset may not
+                // exist. Keep polling will just block forever.
+                if (lastRecord.offset() >= stoppingOffset - 1) {
+                    recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
                     finishSplitAtRecord(
                             tp,
                             stoppingOffset,
-                            consumerRecord.offset(),
+                            lastRecord.offset(),
                             finishedPartitions,
                             recordsBySplits);
-                    break;
                 }
-                // Add the record to the partition collector.
-                try {
-                    deserializationSchema.deserialize(consumerRecord, collector);
-                    collector
-                            .getRecords()
-                            .forEach(
-                                    r ->
-                                            recordsForSplit.add(
-                                                    new Tuple3<>(
-                                                            r,
-                                                            consumerRecord.offset(),
-                                                            consumerRecord.timestamp())));
-                    // Finish the split because there might not be any message after this point.
-                    // Keep polling
-                    // will just block forever.
-                    if (consumerRecord.offset() == stoppingOffset - 1) {
-                        finishSplitAtRecord(
-                                tp,
-                                stoppingOffset,
-                                consumerRecord.offset(),
-                                finishedPartitions,
-                                recordsBySplits);
-                    }
-                } catch (Exception e) {
-                    throw new IOException("Failed to deserialize consumer record due to", e);
-                } finally {
-                    collector.reset();
-                }
-            }
-
-            // Use the last record for updating offset metrics
-            if (recordsFromPartition.size() > 0) {
-                kafkaSourceReaderMetrics.recordCurrentOffset(
-                        tp, recordsFromPartition.get(recordsFromPartition.size() - 1).offset());
             }
-
             // Track this partition's record lag if it never appears before
             kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
         }
@@ -199,7 +140,6 @@ public class KafkaPartitionSplitReader<T>
             finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
             unassignPartitions(finishedPartitions);
         }
-        recordsBySplits.prepareForRead();
 
         // Update numBytesIn
         kafkaSourceReaderMetrics.updateNumBytesInCounter();
@@ -422,7 +362,7 @@ public class KafkaPartitionSplitReader<T>
             long stoppingOffset,
             long currentOffset,
             List<TopicPartition> finishedPartitions,
-            KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits) {
+            KafkaPartitionSplitRecords recordsBySplits) {
         LOG.debug(
                 "{} has reached stopping offset {}, current offset is {}",
                 tp,
@@ -452,57 +392,67 @@ public class KafkaPartitionSplitReader<T>
 
     // ---------------- private helper class ------------------------
 
-    private static class KafkaPartitionSplitRecords<T> implements RecordsWithSplitIds<T> {
-        private final Map<String, Collection<T>> recordsBySplits;
-        private final Set<String> finishedSplits;
-        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
-        private String currentSplitId;
-        private Iterator<T> recordIterator;
-
-        private KafkaPartitionSplitRecords() {
-            this.recordsBySplits = new HashMap<>();
-            this.finishedSplits = new HashSet<>();
+    private static class KafkaPartitionSplitRecords
+            implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
+
+        private final Set<String> finishedSplits = new HashSet<>();
+        private final Map<TopicPartition, Long> stoppingOffsets = new HashMap<>();
+        private final ConsumerRecords<byte[], byte[]> consumerRecords;
+        private final KafkaSourceReaderMetrics metrics;
+        private final Iterator<TopicPartition> splitIterator;
+        private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
+        private TopicPartition currentTopicPartition;
+        private Long currentSplitStoppingOffset;
+
+        private KafkaPartitionSplitRecords(
+                ConsumerRecords<byte[], byte[]> consumerRecords, KafkaSourceReaderMetrics metrics) {
+            this.consumerRecords = consumerRecords;
+            this.splitIterator = consumerRecords.partitions().iterator();
+            this.metrics = metrics;
         }
 
-        private Collection<T> recordsForSplit(String splitId) {
-            return recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList<>());
+        private void setPartitionStoppingOffset(
+                TopicPartition topicPartition, long stoppingOffset) {
+            stoppingOffsets.put(topicPartition, stoppingOffset);
         }
 
         private void addFinishedSplit(String splitId) {
             finishedSplits.add(splitId);
         }
 
-        private void prepareForRead() {
-            splitIterator = recordsBySplits.entrySet().iterator();
-        }
-
-        @Override
         @Nullable
+        @Override
         public String nextSplit() {
             if (splitIterator.hasNext()) {
-                Map.Entry<String, Collection<T>> entry = splitIterator.next();
-                currentSplitId = entry.getKey();
-                recordIterator = entry.getValue().iterator();
-                return currentSplitId;
+                currentTopicPartition = splitIterator.next();
+                recordIterator = consumerRecords.records(currentTopicPartition).iterator();
+                currentSplitStoppingOffset =
+                        stoppingOffsets.getOrDefault(currentTopicPartition, Long.MAX_VALUE);
+                return currentTopicPartition.toString();
             } else {
-                currentSplitId = null;
+                currentTopicPartition = null;
                 recordIterator = null;
+                currentSplitStoppingOffset = null;
                 return null;
             }
         }
 
-        @Override
         @Nullable
-        public T nextRecordFromSplit() {
+        @Override
+        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
             Preconditions.checkNotNull(
-                    currentSplitId,
+                    currentTopicPartition,
                     "Make sure nextSplit() did not return null before "
                             + "iterate over the records split.");
             if (recordIterator.hasNext()) {
-                return recordIterator.next();
-            } else {
-                return null;
+                final ConsumerRecord<byte[], byte[]> record = recordIterator.next();
+                // Only emit records before stopping offset
+                if (record.offset() < currentSplitStoppingOffset) {
+                    metrics.recordCurrentOffset(currentTopicPartition, record.offset());
+                    return record;
+                }
             }
+            return null;
         }
 
         @Override
@@ -510,24 +460,4 @@ public class KafkaPartitionSplitReader<T>
             return finishedSplits;
         }
     }
-
-    private static class SimpleCollector<T> implements Collector<T> {
-        private final List<T> records = new ArrayList<>();
-
-        @Override
-        public void collect(T record) {
-            records.add(record);
-        }
-
-        @Override
-        public void close() {}
-
-        private List<T> getRecords() {
-            return records;
-        }
-
-        private void reset() {
-            records.clear();
-        }
-    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java
index 9470559..486ef05 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java
@@ -19,21 +19,61 @@
 package org.apache.flink.connector.kafka.source.reader;
 
 import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.IOException;
 
 /** The {@link RecordEmitter} implementation for {@link KafkaSourceReader}. */
 public class KafkaRecordEmitter<T>
-        implements RecordEmitter<Tuple3<T, Long, Long>, T, KafkaPartitionSplitState> {
+        implements RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState> {
+
+    private final KafkaRecordDeserializationSchema<T> deserializationSchema;
+    private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();
+
+    public KafkaRecordEmitter(KafkaRecordDeserializationSchema<T> deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
 
     @Override
     public void emitRecord(
-            Tuple3<T, Long, Long> element,
+            ConsumerRecord<byte[], byte[]> consumerRecord,
             SourceOutput<T> output,
             KafkaPartitionSplitState splitState)
             throws Exception {
-        output.collect(element.f0, element.f2);
-        splitState.setCurrentOffset(element.f1 + 1);
+        try {
+            sourceOutputWrapper.setSourceOutput(output);
+            sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
+            deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
+            splitState.setCurrentOffset(consumerRecord.offset() + 1);
+        } catch (Exception e) {
+            throw new IOException("Failed to deserialize consumer record due to", e);
+        }
+    }
+
+    private static class SourceOutputWrapper<T> implements Collector<T> {
+
+        private SourceOutput<T> sourceOutput;
+        private long timestamp;
+
+        @Override
+        public void collect(T record) {
+            sourceOutput.collect(record, timestamp);
+        }
+
+        @Override
+        public void close() {}
+
+        private void setSourceOutput(SourceOutput<T> sourceOutput) {
+            this.sourceOutput = sourceOutput;
+        }
+
+        private void setTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
     }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 4a42a48..e9a40cd 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.kafka.source.reader;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -32,6 +31,7 @@ import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcher
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -49,7 +49,7 @@ import java.util.concurrent.ConcurrentMap;
 /** The source reader for Kafka partitions. */
 public class KafkaSourceReader<T>
         extends SingleThreadMultiplexSourceReaderBase<
-                Tuple3<T, Long, Long>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
+                ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class);
     // These maps need to be concurrent because it will be accessed by both the main thread
     // and the split fetcher thread in the callback.
@@ -59,9 +59,11 @@ public class KafkaSourceReader<T>
     private final boolean commitOffsetsOnCheckpoint;
 
     public KafkaSourceReader(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
-            KafkaSourceFetcherManager<T> kafkaSourceFetcherManager,
-            RecordEmitter<Tuple3<T, Long, Long>, T, KafkaPartitionSplitState> recordEmitter,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
+                    elementsQueue,
+            KafkaSourceFetcherManager kafkaSourceFetcherManager,
+            RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState>
+                    recordEmitter,
             Configuration config,
             SourceReaderContext context,
             KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
@@ -134,7 +136,7 @@ public class KafkaSourceReader<T>
             return;
         }
 
-        ((KafkaSourceFetcherManager<T>) splitFetcherManager)
+        ((KafkaSourceFetcherManager) splitFetcherManager)
                 .commitOffsets(
                         committedPartitions,
                         (ignored, e) -> {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
index f4a804b..81c9635 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.kafka.source.reader.fetcher;
 
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
@@ -29,6 +28,7 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
@@ -46,8 +46,8 @@ import java.util.function.Supplier;
  * Kafka using the KafkaConsumer inside the {@link
  * org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader}.
  */
-public class KafkaSourceFetcherManager<T>
-        extends SingleThreadFetcherManager<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
+public class KafkaSourceFetcherManager
+        extends SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceFetcherManager.class);
 
     /**
@@ -61,8 +61,10 @@ public class KafkaSourceFetcherManager<T>
      * @param splitFinishedHook Hook for handling finished splits in split fetchers.
      */
     public KafkaSourceFetcherManager(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
-            Supplier<SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit>> splitReaderSupplier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
+                    elementsQueue,
+            Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit>>
+                    splitReaderSupplier,
             Consumer<Collection<String>> splitFinishedHook) {
         super(elementsQueue, splitReaderSupplier, splitFinishedHook);
     }
@@ -73,7 +75,8 @@ public class KafkaSourceFetcherManager<T>
         if (offsetsToCommit.isEmpty()) {
             return;
         }
-        SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher = fetchers.get(0);
+        SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher =
+                fetchers.get(0);
         if (splitFetcher != null) {
             // The fetcher thread is still running. This should be the majority of the cases.
             enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
@@ -85,11 +88,11 @@ public class KafkaSourceFetcherManager<T>
     }
 
     private void enqueueOffsetsCommitTask(
-            SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher,
+            SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher,
             Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
             OffsetCommitCallback callback) {
-        KafkaPartitionSplitReader<T> kafkaReader =
-                (KafkaPartitionSplitReader<T>) splitFetcher.getSplitReader();
+        KafkaPartitionSplitReader kafkaReader =
+                (KafkaPartitionSplitReader) splitFetcher.getSplitReader();
 
         splitFetcher.enqueueTask(
                 new SplitFetcherTask() {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 47891aa4..cf8e501 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -55,6 +55,8 @@ import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -66,6 +68,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -92,9 +95,11 @@ public class KafkaSourceITCase {
             KafkaSourceTestEnv.tearDown();
         }
 
-        @Test
-        public void testTimestamp() throws Throwable {
-            final String topic = "testTimestamp";
+        @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+        @ValueSource(booleans = {false, true})
+        public void testTimestamp(boolean enableObjectReuse) throws Throwable {
+            final String topic =
+                    "testTimestamp-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
             final long currentTimestamp = System.currentTimeMillis();
             KafkaSourceTestEnv.createTestTopic(topic, 1, 1);
             KafkaSourceTestEnv.produceToKafka(
@@ -108,7 +113,8 @@ public class KafkaSourceITCase {
                             .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
                             .setGroupId("testTimestampAndWatermark")
                             .setTopics(topic)
-                            .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+                            .setDeserializer(
+                                    new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
                             .setStartingOffsets(OffsetsInitializer.earliest())
                             .setBounded(OffsetsInitializer.latest())
                             .build();
@@ -132,14 +138,16 @@ public class KafkaSourceITCase {
                     result.getAccumulatorResult("timestamp"));
         }
 
-        @Test
-        public void testBasicRead() throws Exception {
+        @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+        @ValueSource(booleans = {false, true})
+        public void testBasicRead(boolean enableObjectReuse) throws Exception {
             KafkaSource<PartitionAndValue> source =
                     KafkaSource.<PartitionAndValue>builder()
                             .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
                             .setGroupId("testBasicRead")
                             .setTopics(Arrays.asList(TOPIC1, TOPIC2))
-                            .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+                            .setDeserializer(
+                                    new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
                             .setStartingOffsets(OffsetsInitializer.earliest())
                             .setBounded(OffsetsInitializer.latest())
                             .build();
@@ -196,14 +204,16 @@ public class KafkaSourceITCase {
             assertEquals(expectedSum, actualSum.get());
         }
 
-        @Test
-        public void testRedundantParallelism() throws Exception {
+        @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+        @ValueSource(booleans = {false, true})
+        public void testRedundantParallelism(boolean enableObjectReuse) throws Exception {
             KafkaSource<PartitionAndValue> source =
                     KafkaSource.<PartitionAndValue>builder()
                             .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
                             .setGroupId("testRedundantParallelism")
                             .setTopics(Collections.singletonList(TOPIC1))
-                            .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+                            .setDeserializer(
+                                    new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
                             .setStartingOffsets(OffsetsInitializer.earliest())
                             .setBounded(OffsetsInitializer.latest())
                             .build();
@@ -220,13 +230,15 @@ public class KafkaSourceITCase {
             executeAndVerify(env, stream);
         }
 
-        @Test
-        public void testBasicReadWithoutGroupId() throws Exception {
+        @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+        @ValueSource(booleans = {false, true})
+        public void testBasicReadWithoutGroupId(boolean enableObjectReuse) throws Exception {
             KafkaSource<PartitionAndValue> source =
                     KafkaSource.<PartitionAndValue>builder()
                             .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
                             .setTopics(Arrays.asList(TOPIC1, TOPIC2))
-                            .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+                            .setDeserializer(
+                                    new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
                             .setStartingOffsets(OffsetsInitializer.earliest())
                             .setBounded(OffsetsInitializer.latest())
                             .build();
@@ -276,8 +288,10 @@ public class KafkaSourceITCase {
 
     private static class PartitionAndValue implements Serializable {
         private static final long serialVersionUID = 4813439951036021779L;
-        private final String tp;
-        private final int value;
+        private String tp;
+        private int value;
+
+        public PartitionAndValue() {}
 
         private PartitionAndValue(TopicPartition tp, int value) {
             this.tp = tp.toString();
@@ -289,6 +303,12 @@ public class KafkaSourceITCase {
             implements KafkaRecordDeserializationSchema<PartitionAndValue> {
         private static final long serialVersionUID = -3765473065594331694L;
         private transient Deserializer<Integer> deserializer;
+        private final boolean enableObjectReuse;
+        private final PartitionAndValue reuse = new PartitionAndValue();
+
+        public TestingKafkaRecordDeserializationSchema(boolean enableObjectReuse) {
+            this.enableObjectReuse = enableObjectReuse;
+        }
 
         @Override
         public void deserialize(
@@ -297,10 +317,17 @@ public class KafkaSourceITCase {
             if (deserializer == null) {
                 deserializer = new IntegerDeserializer();
             }
-            collector.collect(
-                    new PartitionAndValue(
-                            new TopicPartition(record.topic(), record.partition()),
-                            deserializer.deserialize(record.topic(), record.value())));
+
+            if (enableObjectReuse) {
+                reuse.tp = new TopicPartition(record.topic(), record.partition()).toString();
+                reuse.value = deserializer.deserialize(record.topic(), record.value());
+                collector.collect(reuse);
+            } else {
+                collector.collect(
+                        new PartitionAndValue(
+                                new TopicPartition(record.topic(), record.partition()),
+                                deserializer.deserialize(record.topic(), record.value())));
+            }
         }
 
         @Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index d4dd33f..864d8e9 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.connector.kafka.source.reader;
 
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
-import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -40,6 +37,7 @@ import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -82,6 +80,8 @@ public class KafkaPartitionSplitReaderTest {
     private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners;
     private static Map<TopicPartition, Long> earliestOffsets;
 
+    private final IntegerDeserializer deserializer = new IntegerDeserializer();
+
     @BeforeAll
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
@@ -101,14 +101,14 @@ public class KafkaPartitionSplitReaderTest {
 
     @Test
     public void testHandleSplitChangesAndFetch() throws Exception {
-        KafkaPartitionSplitReader<Integer> reader = createReader();
+        KafkaPartitionSplitReader reader = createReader();
         assignSplitsAndFetchUntilFinish(reader, 0);
         assignSplitsAndFetchUntilFinish(reader, 1);
     }
 
     @Test
     public void testWakeUp() throws Exception {
-        KafkaPartitionSplitReader<Integer> reader = createReader();
+        KafkaPartitionSplitReader reader = createReader();
         TopicPartition nonExistingTopicPartition = new TopicPartition("NotExist", 0);
         assignSplits(
                 reader,
@@ -141,7 +141,7 @@ public class KafkaPartitionSplitReaderTest {
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
         final Counter numBytesInCounter =
                 operatorMetricGroup.getIOMetricGroup().getNumBytesInCounter();
-        KafkaPartitionSplitReader<Integer> reader =
+        KafkaPartitionSplitReader reader =
                 createReader(
                         new Properties(),
                         InternalSourceReaderMetricGroup.wrap(operatorMetricGroup));
@@ -180,7 +180,7 @@ public class KafkaPartitionSplitReaderTest {
         MetricListener metricListener = new MetricListener();
         final Properties props = new Properties();
         props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
-        KafkaPartitionSplitReader<Integer> reader =
+        KafkaPartitionSplitReader reader =
                 createReader(
                         props,
                         InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
@@ -217,7 +217,7 @@ public class KafkaPartitionSplitReaderTest {
 
     @Test
     public void testAssignEmptySplit() throws Exception {
-        KafkaPartitionSplitReader<Integer> reader = createReader();
+        KafkaPartitionSplitReader reader = createReader();
         final KafkaPartitionSplit normalSplit =
                 new KafkaPartitionSplit(
                         new TopicPartition(TOPIC1, 0),
@@ -231,7 +231,7 @@ public class KafkaPartitionSplitReaderTest {
         reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(normalSplit, emptySplit)));
 
         // Fetch and check empty splits is added to finished splits
-        RecordsWithSplitIds<Tuple3<Integer, Long, Long>> recordsWithSplitIds = reader.fetch();
+        RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
         assertTrue(recordsWithSplitIds.finishedSplits().contains(emptySplit.splitId()));
 
         // Assign another valid split to avoid consumer.poll() blocking
@@ -250,20 +250,20 @@ public class KafkaPartitionSplitReaderTest {
 
     // ------------------
 
-    private void assignSplitsAndFetchUntilFinish(
-            KafkaPartitionSplitReader<Integer> reader, int readerId) throws IOException {
+    private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)
+            throws IOException {
         Map<String, KafkaPartitionSplit> splits =
                 assignSplits(reader, splitsByOwners.get(readerId));
 
         Map<String, Integer> numConsumedRecords = new HashMap<>();
         Set<String> finishedSplits = new HashSet<>();
         while (finishedSplits.size() < splits.size()) {
-            RecordsWithSplitIds<Tuple3<Integer, Long, Long>> recordsBySplitIds = reader.fetch();
+            RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsBySplitIds = reader.fetch();
             String splitId = recordsBySplitIds.nextSplit();
             while (splitId != null) {
                 // Collect the records in this split.
-                List<Tuple3<Integer, Long, Long>> splitFetch = new ArrayList<>();
-                Tuple3<Integer, Long, Long> record;
+                List<ConsumerRecord<byte[], byte[]>> splitFetch = new ArrayList<>();
+                ConsumerRecord<byte[], byte[]> record;
                 while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
                     splitFetch.add(record);
                 }
@@ -305,34 +305,29 @@ public class KafkaPartitionSplitReaderTest {
 
     // ------------------
 
-    private KafkaPartitionSplitReader<Integer> createReader() throws Exception {
+    private KafkaPartitionSplitReader createReader() {
         return createReader(
                 new Properties(), UnregisteredMetricsGroup.createSourceReaderMetricGroup());
     }
 
-    private KafkaPartitionSplitReader<Integer> createReader(
-            Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup)
-            throws Exception {
+    private KafkaPartitionSplitReader createReader(
+            Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup) {
         Properties props = new Properties();
         props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
         props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         if (!additionalProperties.isEmpty()) {
             props.putAll(additionalProperties);
         }
-        KafkaRecordDeserializationSchema<Integer> deserializationSchema =
-                KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class);
-        deserializationSchema.open(new TestingDeserializationContext());
         KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
                 new KafkaSourceReaderMetrics(sourceReaderMetricGroup);
-        return new KafkaPartitionSplitReader<>(
+        return new KafkaPartitionSplitReader(
                 props,
-                deserializationSchema,
                 new TestingReaderContext(new Configuration(), sourceReaderMetricGroup),
                 kafkaSourceReaderMetrics);
     }
 
     private Map<String, KafkaPartitionSplit> assignSplits(
-            KafkaPartitionSplitReader<Integer> reader, Map<String, KafkaPartitionSplit> splits) {
+            KafkaPartitionSplitReader reader, Map<String, KafkaPartitionSplit> splits) {
         SplitsChange<KafkaPartitionSplit> splitsChange =
                 new SplitsAddition<>(new ArrayList<>(splits.values()));
         reader.handleSplitsChanges(splitsChange);
@@ -342,16 +337,16 @@ public class KafkaPartitionSplitReaderTest {
     private boolean verifyConsumed(
             final KafkaPartitionSplit split,
             final long expectedStartingOffset,
-            final Collection<Tuple3<Integer, Long, Long>> consumed) {
+            final Collection<ConsumerRecord<byte[], byte[]>> consumed) {
         long expectedOffset = expectedStartingOffset;
 
-        for (Tuple3<Integer, Long, Long> record : consumed) {
+        for (ConsumerRecord<byte[], byte[]> record : consumed) {
             int expectedValue = (int) expectedOffset;
             long expectedTimestamp = expectedOffset * 1000L;
 
-            assertEquals(expectedValue, (int) record.f0);
-            assertEquals(expectedOffset, (long) record.f1);
-            assertEquals(expectedTimestamp, (long) record.f2);
+            assertEquals(expectedValue, deserializer.deserialize(record.topic(), record.value()));
+            assertEquals(expectedOffset, record.offset());
+            assertEquals(expectedTimestamp, record.timestamp());
 
             expectedOffset++;
         }