You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2021/12/26 11:43:16 UTC
[flink] branch release-1.14 updated: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deserializer
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new ebbf772 [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deserializer
ebbf772 is described below
commit ebbf772ea287ee987f5eb628ad2e395895b312aa
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Dec 2 08:51:26 2021 +0800
[FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deserializer
---
.../flink/connector/kafka/source/KafkaSource.java | 20 +--
.../source/reader/KafkaPartitionSplitReader.java | 186 +++++++--------------
.../kafka/source/reader/KafkaRecordEmitter.java | 50 +++++-
.../kafka/source/reader/KafkaSourceReader.java | 14 +-
.../reader/fetcher/KafkaSourceFetcherManager.java | 21 ++-
.../connector/kafka/source/KafkaSourceITCase.java | 65 ++++---
.../reader/KafkaPartitionSplitReaderTest.java | 53 +++---
7 files changed, 201 insertions(+), 208 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 6df7d2f..400e803 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -49,6 +48,8 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -131,8 +132,8 @@ public class KafkaSource<OUT>
SourceReader<OUT, KafkaPartitionSplit> createReader(
SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook)
throws Exception {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
- new FutureCompletingBlockingQueue<>();
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
+ elementsQueue = new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
@@ -148,18 +149,13 @@ public class KafkaSource<OUT>
final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(readerContext.metricGroup());
- Supplier<KafkaPartitionSplitReader<OUT>> splitReaderSupplier =
- () ->
- new KafkaPartitionSplitReader<>(
- props,
- deserializationSchema,
- readerContext,
- kafkaSourceReaderMetrics);
- KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>();
+ Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
+ () -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
+ KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
return new KafkaSourceReader<>(
elementsQueue,
- new KafkaSourceFetcherManager<>(
+ new KafkaSourceFetcherManager(
elementsQueue, splitReaderSupplier::get, splitFinishedHook),
recordEmitter,
toConfiguration(props),
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index d048230..ebadef3 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -19,16 +19,13 @@
package org.apache.flink.connector.kafka.source.reader;
import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
-import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -59,22 +56,14 @@ import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
-/**
- * A {@link SplitReader} implementation that reads records from Kafka partitions.
- *
- * <p>The returned type are in the format of {@code tuple3(record, offset and timestamp}.
- *
- * @param <T> the type of the record to be emitted from the Source.
- */
-public class KafkaPartitionSplitReader<T>
- implements SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
+/** A {@link SplitReader} implementation that reads records from Kafka partitions. */
+public class KafkaPartitionSplitReader
+ implements SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
private static final long POLL_TIMEOUT = 10000L;
private final KafkaConsumer<byte[], byte[]> consumer;
- private final KafkaRecordDeserializationSchema<T> deserializationSchema;
private final Map<TopicPartition, Long> stoppingOffsets;
- private final SimpleCollector<T> collector;
private final String groupId;
private final int subtaskId;
@@ -85,7 +74,6 @@ public class KafkaPartitionSplitReader<T>
public KafkaPartitionSplitReader(
Properties props,
- KafkaRecordDeserializationSchema<T> deserializationSchema,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
this.subtaskId = context.getIndexOfSubtask();
@@ -95,8 +83,6 @@ public class KafkaPartitionSplitReader<T>
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
this.consumer = new KafkaConsumer<>(consumerProps);
this.stoppingOffsets = new HashMap<>();
- this.deserializationSchema = deserializationSchema;
- this.collector = new SimpleCollector<>();
this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
// Metric registration
@@ -105,84 +91,39 @@ public class KafkaPartitionSplitReader<T>
}
@Override
- public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
- KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits =
- new KafkaPartitionSplitRecords<>();
+ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
ConsumerRecords<byte[], byte[]> consumerRecords;
try {
consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
} catch (WakeupException we) {
- recordsBySplits.prepareForRead();
- return recordsBySplits;
+ return new KafkaPartitionSplitRecords(
+ ConsumerRecords.empty(), kafkaSourceReaderMetrics);
}
-
+ KafkaPartitionSplitRecords recordsBySplits =
+ new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics);
List<TopicPartition> finishedPartitions = new ArrayList<>();
for (TopicPartition tp : consumerRecords.partitions()) {
long stoppingOffset = getStoppingOffset(tp);
- String splitId = tp.toString();
- Collection<Tuple3<T, Long, Long>> recordsForSplit =
- recordsBySplits.recordsForSplit(splitId);
final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
consumerRecords.records(tp);
- for (ConsumerRecord<byte[], byte[]> consumerRecord : recordsFromPartition) {
- // Stop consuming from this partition if the offsets has reached the stopping
- // offset.
- // Note that there are two cases, either case finishes a split:
- // 1. After processing a record with offset of "stoppingOffset - 1". The split
- // reader
- // should not continue fetching because the record with stoppingOffset may not
- // exist.
- // 2. Before processing a record whose offset is greater than or equals to the
- // stopping
- // offset. This should only happens when case 1 was not met due to log compaction
- // or
- // log retention.
- // Case 2 is handled here. Case 1 is handled after the record is processed.
- if (consumerRecord.offset() >= stoppingOffset) {
+
+ if (recordsFromPartition.size() > 0) {
+ final ConsumerRecord<byte[], byte[]> lastRecord =
+ recordsFromPartition.get(recordsFromPartition.size() - 1);
+
+ // After processing a record with offset of "stoppingOffset - 1", the split reader
+ // should not continue fetching because the record with stoppingOffset may not
+ // exist. Keep polling will just block forever.
+ if (lastRecord.offset() >= stoppingOffset - 1) {
+ recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
- consumerRecord.offset(),
+ lastRecord.offset(),
finishedPartitions,
recordsBySplits);
- break;
}
- // Add the record to the partition collector.
- try {
- deserializationSchema.deserialize(consumerRecord, collector);
- collector
- .getRecords()
- .forEach(
- r ->
- recordsForSplit.add(
- new Tuple3<>(
- r,
- consumerRecord.offset(),
- consumerRecord.timestamp())));
- // Finish the split because there might not be any message after this point.
- // Keep polling
- // will just block forever.
- if (consumerRecord.offset() == stoppingOffset - 1) {
- finishSplitAtRecord(
- tp,
- stoppingOffset,
- consumerRecord.offset(),
- finishedPartitions,
- recordsBySplits);
- }
- } catch (Exception e) {
- throw new IOException("Failed to deserialize consumer record due to", e);
- } finally {
- collector.reset();
- }
- }
-
- // Use the last record for updating offset metrics
- if (recordsFromPartition.size() > 0) {
- kafkaSourceReaderMetrics.recordCurrentOffset(
- tp, recordsFromPartition.get(recordsFromPartition.size() - 1).offset());
}
-
// Track this partition's record lag if it never appears before
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
}
@@ -199,7 +140,6 @@ public class KafkaPartitionSplitReader<T>
finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
unassignPartitions(finishedPartitions);
}
- recordsBySplits.prepareForRead();
// Update numBytesIn
kafkaSourceReaderMetrics.updateNumBytesInCounter();
@@ -422,7 +362,7 @@ public class KafkaPartitionSplitReader<T>
long stoppingOffset,
long currentOffset,
List<TopicPartition> finishedPartitions,
- KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits) {
+ KafkaPartitionSplitRecords recordsBySplits) {
LOG.debug(
"{} has reached stopping offset {}, current offset is {}",
tp,
@@ -452,57 +392,67 @@ public class KafkaPartitionSplitReader<T>
// ---------------- private helper class ------------------------
- private static class KafkaPartitionSplitRecords<T> implements RecordsWithSplitIds<T> {
- private final Map<String, Collection<T>> recordsBySplits;
- private final Set<String> finishedSplits;
- private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
- private String currentSplitId;
- private Iterator<T> recordIterator;
-
- private KafkaPartitionSplitRecords() {
- this.recordsBySplits = new HashMap<>();
- this.finishedSplits = new HashSet<>();
+ private static class KafkaPartitionSplitRecords
+ implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
+
+ private final Set<String> finishedSplits = new HashSet<>();
+ private final Map<TopicPartition, Long> stoppingOffsets = new HashMap<>();
+ private final ConsumerRecords<byte[], byte[]> consumerRecords;
+ private final KafkaSourceReaderMetrics metrics;
+ private final Iterator<TopicPartition> splitIterator;
+ private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
+ private TopicPartition currentTopicPartition;
+ private Long currentSplitStoppingOffset;
+
+ private KafkaPartitionSplitRecords(
+ ConsumerRecords<byte[], byte[]> consumerRecords, KafkaSourceReaderMetrics metrics) {
+ this.consumerRecords = consumerRecords;
+ this.splitIterator = consumerRecords.partitions().iterator();
+ this.metrics = metrics;
}
- private Collection<T> recordsForSplit(String splitId) {
- return recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList<>());
+ private void setPartitionStoppingOffset(
+ TopicPartition topicPartition, long stoppingOffset) {
+ stoppingOffsets.put(topicPartition, stoppingOffset);
}
private void addFinishedSplit(String splitId) {
finishedSplits.add(splitId);
}
- private void prepareForRead() {
- splitIterator = recordsBySplits.entrySet().iterator();
- }
-
- @Override
@Nullable
+ @Override
public String nextSplit() {
if (splitIterator.hasNext()) {
- Map.Entry<String, Collection<T>> entry = splitIterator.next();
- currentSplitId = entry.getKey();
- recordIterator = entry.getValue().iterator();
- return currentSplitId;
+ currentTopicPartition = splitIterator.next();
+ recordIterator = consumerRecords.records(currentTopicPartition).iterator();
+ currentSplitStoppingOffset =
+ stoppingOffsets.getOrDefault(currentTopicPartition, Long.MAX_VALUE);
+ return currentTopicPartition.toString();
} else {
- currentSplitId = null;
+ currentTopicPartition = null;
recordIterator = null;
+ currentSplitStoppingOffset = null;
return null;
}
}
- @Override
@Nullable
- public T nextRecordFromSplit() {
+ @Override
+ public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
Preconditions.checkNotNull(
- currentSplitId,
+ currentTopicPartition,
"Make sure nextSplit() did not return null before "
+ "iterate over the records split.");
if (recordIterator.hasNext()) {
- return recordIterator.next();
- } else {
- return null;
+ final ConsumerRecord<byte[], byte[]> record = recordIterator.next();
+ // Only emit records before stopping offset
+ if (record.offset() < currentSplitStoppingOffset) {
+ metrics.recordCurrentOffset(currentTopicPartition, record.offset());
+ return record;
+ }
}
+ return null;
}
@Override
@@ -510,24 +460,4 @@ public class KafkaPartitionSplitReader<T>
return finishedSplits;
}
}
-
- private static class SimpleCollector<T> implements Collector<T> {
- private final List<T> records = new ArrayList<>();
-
- @Override
- public void collect(T record) {
- records.add(record);
- }
-
- @Override
- public void close() {}
-
- private List<T> getRecords() {
- return records;
- }
-
- private void reset() {
- records.clear();
- }
- }
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java
index 9470559..486ef05 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java
@@ -19,21 +19,61 @@
package org.apache.flink.connector.kafka.source.reader;
import org.apache.flink.api.connector.source.SourceOutput;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.IOException;
/** The {@link RecordEmitter} implementation for {@link KafkaSourceReader}. */
public class KafkaRecordEmitter<T>
- implements RecordEmitter<Tuple3<T, Long, Long>, T, KafkaPartitionSplitState> {
+ implements RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState> {
+
+ private final KafkaRecordDeserializationSchema<T> deserializationSchema;
+ private final SourceOutputWrapper<T> sourceOutputWrapper = new SourceOutputWrapper<>();
+
+ public KafkaRecordEmitter(KafkaRecordDeserializationSchema<T> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
@Override
public void emitRecord(
- Tuple3<T, Long, Long> element,
+ ConsumerRecord<byte[], byte[]> consumerRecord,
SourceOutput<T> output,
KafkaPartitionSplitState splitState)
throws Exception {
- output.collect(element.f0, element.f2);
- splitState.setCurrentOffset(element.f1 + 1);
+ try {
+ sourceOutputWrapper.setSourceOutput(output);
+ sourceOutputWrapper.setTimestamp(consumerRecord.timestamp());
+ deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);
+ splitState.setCurrentOffset(consumerRecord.offset() + 1);
+ } catch (Exception e) {
+ throw new IOException("Failed to deserialize consumer record due to", e);
+ }
+ }
+
+ private static class SourceOutputWrapper<T> implements Collector<T> {
+
+ private SourceOutput<T> sourceOutput;
+ private long timestamp;
+
+ @Override
+ public void collect(T record) {
+ sourceOutput.collect(record, timestamp);
+ }
+
+ @Override
+ public void close() {}
+
+ private void setSourceOutput(SourceOutput<T> sourceOutput) {
+ this.sourceOutput = sourceOutput;
+ }
+
+ private void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
}
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 4a42a48..e9a40cd 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.kafka.source.reader;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -32,6 +31,7 @@ import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcher
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
@@ -49,7 +49,7 @@ import java.util.concurrent.ConcurrentMap;
/** The source reader for Kafka partitions. */
public class KafkaSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
- Tuple3<T, Long, Long>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
+ ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class);
// These maps need to be concurrent because it will be accessed by both the main thread
// and the split fetcher thread in the callback.
@@ -59,9 +59,11 @@ public class KafkaSourceReader<T>
private final boolean commitOffsetsOnCheckpoint;
public KafkaSourceReader(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
- KafkaSourceFetcherManager<T> kafkaSourceFetcherManager,
- RecordEmitter<Tuple3<T, Long, Long>, T, KafkaPartitionSplitState> recordEmitter,
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
+ elementsQueue,
+ KafkaSourceFetcherManager kafkaSourceFetcherManager,
+ RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState>
+ recordEmitter,
Configuration config,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
@@ -134,7 +136,7 @@ public class KafkaSourceReader<T>
return;
}
- ((KafkaSourceFetcherManager<T>) splitFetcherManager)
+ ((KafkaSourceFetcherManager) splitFetcherManager)
.commitOffsets(
committedPartitions,
(ignored, e) -> {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
index f4a804b..81c9635 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.kafka.source.reader.fetcher;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
@@ -29,6 +28,7 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
@@ -46,8 +46,8 @@ import java.util.function.Supplier;
* Kafka using the KafkaConsumer inside the {@link
* org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader}.
*/
-public class KafkaSourceFetcherManager<T>
- extends SingleThreadFetcherManager<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
+public class KafkaSourceFetcherManager
+ extends SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceFetcherManager.class);
/**
@@ -61,8 +61,10 @@ public class KafkaSourceFetcherManager<T>
* @param splitFinishedHook Hook for handling finished splits in split fetchers.
*/
public KafkaSourceFetcherManager(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
- Supplier<SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit>> splitReaderSupplier,
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>
+ elementsQueue,
+ Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit>>
+ splitReaderSupplier,
Consumer<Collection<String>> splitFinishedHook) {
super(elementsQueue, splitReaderSupplier, splitFinishedHook);
}
@@ -73,7 +75,8 @@ public class KafkaSourceFetcherManager<T>
if (offsetsToCommit.isEmpty()) {
return;
}
- SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher = fetchers.get(0);
+ SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher =
+ fetchers.get(0);
if (splitFetcher != null) {
// The fetcher thread is still running. This should be the majority of the cases.
enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
@@ -85,11 +88,11 @@ public class KafkaSourceFetcherManager<T>
}
private void enqueueOffsetsCommitTask(
- SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher,
+ SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher,
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback callback) {
- KafkaPartitionSplitReader<T> kafkaReader =
- (KafkaPartitionSplitReader<T>) splitFetcher.getSplitReader();
+ KafkaPartitionSplitReader kafkaReader =
+ (KafkaPartitionSplitReader) splitFetcher.getSplitReader();
splitFetcher.enqueueTask(
new SplitFetcherTask() {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 47891aa4..cf8e501 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -55,6 +55,8 @@ import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@@ -66,6 +68,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -92,9 +95,11 @@ public class KafkaSourceITCase {
KafkaSourceTestEnv.tearDown();
}
- @Test
- public void testTimestamp() throws Throwable {
- final String topic = "testTimestamp";
+ @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+ @ValueSource(booleans = {false, true})
+ public void testTimestamp(boolean enableObjectReuse) throws Throwable {
+ final String topic =
+ "testTimestamp-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
final long currentTimestamp = System.currentTimeMillis();
KafkaSourceTestEnv.createTestTopic(topic, 1, 1);
KafkaSourceTestEnv.produceToKafka(
@@ -108,7 +113,8 @@ public class KafkaSourceITCase {
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testTimestampAndWatermark")
.setTopics(topic)
- .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ .setDeserializer(
+ new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
@@ -132,14 +138,16 @@ public class KafkaSourceITCase {
result.getAccumulatorResult("timestamp"));
}
- @Test
- public void testBasicRead() throws Exception {
+ @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+ @ValueSource(booleans = {false, true})
+ public void testBasicRead(boolean enableObjectReuse) throws Exception {
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testBasicRead")
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
- .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ .setDeserializer(
+ new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
@@ -196,14 +204,16 @@ public class KafkaSourceITCase {
assertEquals(expectedSum, actualSum.get());
}
- @Test
- public void testRedundantParallelism() throws Exception {
+ @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+ @ValueSource(booleans = {false, true})
+ public void testRedundantParallelism(boolean enableObjectReuse) throws Exception {
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testRedundantParallelism")
.setTopics(Collections.singletonList(TOPIC1))
- .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ .setDeserializer(
+ new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
@@ -220,13 +230,15 @@ public class KafkaSourceITCase {
executeAndVerify(env, stream);
}
- @Test
- public void testBasicReadWithoutGroupId() throws Exception {
+ @ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
+ @ValueSource(booleans = {false, true})
+ public void testBasicReadWithoutGroupId(boolean enableObjectReuse) throws Exception {
KafkaSource<PartitionAndValue> source =
KafkaSource.<PartitionAndValue>builder()
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
- .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ .setDeserializer(
+ new TestingKafkaRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
@@ -276,8 +288,10 @@ public class KafkaSourceITCase {
private static class PartitionAndValue implements Serializable {
private static final long serialVersionUID = 4813439951036021779L;
- private final String tp;
- private final int value;
+ private String tp;
+ private int value;
+
+ public PartitionAndValue() {}
private PartitionAndValue(TopicPartition tp, int value) {
this.tp = tp.toString();
@@ -289,6 +303,12 @@ public class KafkaSourceITCase {
implements KafkaRecordDeserializationSchema<PartitionAndValue> {
private static final long serialVersionUID = -3765473065594331694L;
private transient Deserializer<Integer> deserializer;
+ private final boolean enableObjectReuse;
+ private final PartitionAndValue reuse = new PartitionAndValue();
+
+ public TestingKafkaRecordDeserializationSchema(boolean enableObjectReuse) {
+ this.enableObjectReuse = enableObjectReuse;
+ }
@Override
public void deserialize(
@@ -297,10 +317,17 @@ public class KafkaSourceITCase {
if (deserializer == null) {
deserializer = new IntegerDeserializer();
}
- collector.collect(
- new PartitionAndValue(
- new TopicPartition(record.topic(), record.partition()),
- deserializer.deserialize(record.topic(), record.value())));
+
+ if (enableObjectReuse) {
+ reuse.tp = new TopicPartition(record.topic(), record.partition()).toString();
+ reuse.value = deserializer.deserialize(record.topic(), record.value());
+ collector.collect(reuse);
+ } else {
+ collector.collect(
+ new PartitionAndValue(
+ new TopicPartition(record.topic(), record.partition()),
+ deserializer.deserialize(record.topic(), record.value())));
+ }
}
@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index d4dd33f..864d8e9 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -18,16 +18,13 @@
package org.apache.flink.connector.kafka.source.reader;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
-import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv;
-import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
@@ -40,6 +37,7 @@ import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -82,6 +80,8 @@ public class KafkaPartitionSplitReaderTest {
private static Map<Integer, Map<String, KafkaPartitionSplit>> splitsByOwners;
private static Map<TopicPartition, Long> earliestOffsets;
+ private final IntegerDeserializer deserializer = new IntegerDeserializer();
+
@BeforeAll
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
@@ -101,14 +101,14 @@ public class KafkaPartitionSplitReaderTest {
@Test
public void testHandleSplitChangesAndFetch() throws Exception {
- KafkaPartitionSplitReader<Integer> reader = createReader();
+ KafkaPartitionSplitReader reader = createReader();
assignSplitsAndFetchUntilFinish(reader, 0);
assignSplitsAndFetchUntilFinish(reader, 1);
}
@Test
public void testWakeUp() throws Exception {
- KafkaPartitionSplitReader<Integer> reader = createReader();
+ KafkaPartitionSplitReader reader = createReader();
TopicPartition nonExistingTopicPartition = new TopicPartition("NotExist", 0);
assignSplits(
reader,
@@ -141,7 +141,7 @@ public class KafkaPartitionSplitReaderTest {
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
final Counter numBytesInCounter =
operatorMetricGroup.getIOMetricGroup().getNumBytesInCounter();
- KafkaPartitionSplitReader<Integer> reader =
+ KafkaPartitionSplitReader reader =
createReader(
new Properties(),
InternalSourceReaderMetricGroup.wrap(operatorMetricGroup));
@@ -180,7 +180,7 @@ public class KafkaPartitionSplitReaderTest {
MetricListener metricListener = new MetricListener();
final Properties props = new Properties();
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
- KafkaPartitionSplitReader<Integer> reader =
+ KafkaPartitionSplitReader reader =
createReader(
props,
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
@@ -217,7 +217,7 @@ public class KafkaPartitionSplitReaderTest {
@Test
public void testAssignEmptySplit() throws Exception {
- KafkaPartitionSplitReader<Integer> reader = createReader();
+ KafkaPartitionSplitReader reader = createReader();
final KafkaPartitionSplit normalSplit =
new KafkaPartitionSplit(
new TopicPartition(TOPIC1, 0),
@@ -231,7 +231,7 @@ public class KafkaPartitionSplitReaderTest {
reader.handleSplitsChanges(new SplitsAddition<>(Arrays.asList(normalSplit, emptySplit)));
// Fetch and check empty splits is added to finished splits
- RecordsWithSplitIds<Tuple3<Integer, Long, Long>> recordsWithSplitIds = reader.fetch();
+ RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch();
assertTrue(recordsWithSplitIds.finishedSplits().contains(emptySplit.splitId()));
// Assign another valid split to avoid consumer.poll() blocking
@@ -250,20 +250,20 @@ public class KafkaPartitionSplitReaderTest {
// ------------------
- private void assignSplitsAndFetchUntilFinish(
- KafkaPartitionSplitReader<Integer> reader, int readerId) throws IOException {
+ private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)
+ throws IOException {
Map<String, KafkaPartitionSplit> splits =
assignSplits(reader, splitsByOwners.get(readerId));
Map<String, Integer> numConsumedRecords = new HashMap<>();
Set<String> finishedSplits = new HashSet<>();
while (finishedSplits.size() < splits.size()) {
- RecordsWithSplitIds<Tuple3<Integer, Long, Long>> recordsBySplitIds = reader.fetch();
+ RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsBySplitIds = reader.fetch();
String splitId = recordsBySplitIds.nextSplit();
while (splitId != null) {
// Collect the records in this split.
- List<Tuple3<Integer, Long, Long>> splitFetch = new ArrayList<>();
- Tuple3<Integer, Long, Long> record;
+ List<ConsumerRecord<byte[], byte[]>> splitFetch = new ArrayList<>();
+ ConsumerRecord<byte[], byte[]> record;
while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
splitFetch.add(record);
}
@@ -305,34 +305,29 @@ public class KafkaPartitionSplitReaderTest {
// ------------------
- private KafkaPartitionSplitReader<Integer> createReader() throws Exception {
+ private KafkaPartitionSplitReader createReader() {
return createReader(
new Properties(), UnregisteredMetricsGroup.createSourceReaderMetricGroup());
}
- private KafkaPartitionSplitReader<Integer> createReader(
- Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup)
- throws Exception {
+ private KafkaPartitionSplitReader createReader(
+ Properties additionalProperties, SourceReaderMetricGroup sourceReaderMetricGroup) {
Properties props = new Properties();
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
if (!additionalProperties.isEmpty()) {
props.putAll(additionalProperties);
}
- KafkaRecordDeserializationSchema<Integer> deserializationSchema =
- KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class);
- deserializationSchema.open(new TestingDeserializationContext());
KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(sourceReaderMetricGroup);
- return new KafkaPartitionSplitReader<>(
+ return new KafkaPartitionSplitReader(
props,
- deserializationSchema,
new TestingReaderContext(new Configuration(), sourceReaderMetricGroup),
kafkaSourceReaderMetrics);
}
private Map<String, KafkaPartitionSplit> assignSplits(
- KafkaPartitionSplitReader<Integer> reader, Map<String, KafkaPartitionSplit> splits) {
+ KafkaPartitionSplitReader reader, Map<String, KafkaPartitionSplit> splits) {
SplitsChange<KafkaPartitionSplit> splitsChange =
new SplitsAddition<>(new ArrayList<>(splits.values()));
reader.handleSplitsChanges(splitsChange);
@@ -342,16 +337,16 @@ public class KafkaPartitionSplitReaderTest {
private boolean verifyConsumed(
final KafkaPartitionSplit split,
final long expectedStartingOffset,
- final Collection<Tuple3<Integer, Long, Long>> consumed) {
+ final Collection<ConsumerRecord<byte[], byte[]>> consumed) {
long expectedOffset = expectedStartingOffset;
- for (Tuple3<Integer, Long, Long> record : consumed) {
+ for (ConsumerRecord<byte[], byte[]> record : consumed) {
int expectedValue = (int) expectedOffset;
long expectedTimestamp = expectedOffset * 1000L;
- assertEquals(expectedValue, (int) record.f0);
- assertEquals(expectedOffset, (long) record.f1);
- assertEquals(expectedTimestamp, (long) record.f2);
+ assertEquals(expectedValue, deserializer.deserialize(record.topic(), record.value()));
+ assertEquals(expectedOffset, record.offset());
+ assertEquals(expectedTimestamp, record.timestamp());
expectedOffset++;
}