You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2022/06/17 17:32:04 UTC
[pulsar] branch master updated: [Fix][pulsar-io] KCA: handle kafka preCommit() returning earlier offsets than requested (#16100)
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new daf42ccc459 [Fix][pulsar-io] KCA: handle kafka preCommit() returning earlier offsets than requested (#16100)
daf42ccc459 is described below
commit daf42ccc45948f51a95a0412a5c7d80f5ccea167
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jun 17 10:31:58 2022 -0700
[Fix][pulsar-io] KCA: handle kafka preCommit() returning earlier offsets than requested (#16100)
* KCA: handle preCommit returning earlier offsets
* corrected parameter name in the comments
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 77 +++++++++++++++++++---
.../connect/PulsarKafkaConnectSinkConfig.java | 2 +-
.../io/kafka/connect/KafkaConnectSinkTest.java | 63 ++++++++++++++++++
3 files changed, 131 insertions(+), 11 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index e1a34f64407..9dd574ffae3 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -81,7 +81,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
.build());
- private final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
+ @VisibleForTesting
+ protected final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
private final AtomicBoolean isFlushRunning = new AtomicBoolean(false);
private volatile boolean isRunning = false;
@@ -223,28 +224,84 @@ public class KafkaConnectSink implements Sink<GenericObject> {
}
final Record<GenericObject> lastNotFlushed = pendingFlushQueue.getLast();
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = null;
try {
Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets();
- Map<TopicPartition, OffsetAndMetadata> committedOffsets = task.preCommit(currentOffsets);
- if (committedOffsets.isEmpty()) {
+ committedOffsets = task.preCommit(currentOffsets);
+ if (committedOffsets == null || committedOffsets.isEmpty()) {
log.info("Task returned empty committedOffsets map; skipping flush; task will retry later");
return;
}
- taskContext.flushOffsets(currentOffsets);
- ackUntil(lastNotFlushed, Record::ack);
+ if (log.isDebugEnabled() && !areMapsEqual(committedOffsets, currentOffsets)) {
+ log.debug("committedOffsets {} differ from currentOffsets {}", committedOffsets, currentOffsets);
+ }
+ taskContext.flushOffsets(committedOffsets);
+ ackUntil(lastNotFlushed, committedOffsets, Record::ack);
log.info("Flush succeeded");
} catch (Throwable t) {
log.error("error flushing pending records", t);
- ackUntil(lastNotFlushed, Record::fail);
+ ackUntil(lastNotFlushed, committedOffsets, Record::fail);
} finally {
isFlushRunning.compareAndSet(true, false);
}
}
- private void ackUntil(Record<GenericObject> lastNotFlushed, java.util.function.Consumer<Record<GenericObject>> cb) {
- while (!pendingFlushQueue.isEmpty()) {
- Record<GenericObject> r = pendingFlushQueue.pollFirst();
+ private static boolean areMapsEqual(Map<TopicPartition, OffsetAndMetadata> first,
+ Map<TopicPartition, OffsetAndMetadata> second) {
+ if (first.size() != second.size()) {
+ return false;
+ }
+
+ return first.entrySet().stream()
+ .allMatch(e -> e.getValue().equals(second.get(e.getKey())));
+ }
+
+ @VisibleForTesting
+ protected void ackUntil(Record<GenericObject> lastNotFlushed,
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets,
+ java.util.function.Consumer<Record<GenericObject>> cb) {
+ // lastNotFlushed is needed in case of default preCommit() implementation
+ // which calls flush() and returns currentOffsets passed to it.
+ // We don't want to ack messages added to pendingFlushQueue after the preCommit/flush call
+
+ // to avoid creation of new TopicPartition for each record in pendingFlushQueue
+ Map<String, Map<Integer, Long>> topicOffsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> e: committedOffsets.entrySet()) {
+ TopicPartition tp = e.getKey();
+ if (!topicOffsets.containsKey(tp.topic())) {
+ topicOffsets.put(tp.topic(), new HashMap<>());
+ }
+ Map<Integer, Long> partitionOffset = topicOffsets.get(tp.topic());
+ partitionOffset.put(tp.partition(), e.getValue().offset());
+ }
+
+ for (Record<GenericObject> r : pendingFlushQueue) {
+ final String topic = sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
+ final int partition = r.getPartitionIndex().orElse(0);
+
+ Long lastCommittedOffset = null;
+ if (topicOffsets.containsKey(topic)) {
+ lastCommittedOffset = topicOffsets.get(topic).get(partition);
+ }
+
+ if (lastCommittedOffset == null) {
+ if (r == lastNotFlushed) {
+ break;
+ }
+ continue;
+ }
+
+ long offset = getMessageOffset(r);
+
+ if (offset > lastCommittedOffset) {
+ if (r == lastNotFlushed) {
+ break;
+ }
+ continue;
+ }
+
cb.accept(r);
+ pendingFlushQueue.remove(r);
currentBatchSize.addAndGet(-1 * r.getMessage().get().size());
if (r == lastNotFlushed) {
break;
@@ -256,7 +313,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
if (sourceRecord.getMessage().isPresent()) {
// Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
- // Requires enableExposingBrokerEntryMetadataToClient=true on brokers.
+ // Requires exposingBrokerEntryMetadataToClientEnabled=true on brokers.
if (useIndexAsOffset && sourceRecord.getMessage().get().hasIndex()) {
return sourceRecord.getMessage().get()
.getIndex().orElse(-1L);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index 1241f856c55..a6e8b5c966e 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -77,7 +77,7 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
defaultValue = "true",
help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+ "Requires AppendIndexMetadataInterceptor and "
- + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+ + "exposingBrokerEntryMetadataToClientEnabled=true on brokers.")
private boolean useIndexAsOffset = true;
@FieldDoc(
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 91ae86f97eb..7661e5fc98d 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -83,6 +84,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -1163,6 +1165,67 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
sink.close();
}
+
+ @Test
+ public void partialPreCommitTest() throws Exception {
+ props.put("useIndexAsOffset", "false");
+ props.put("maxBatchBitsForOffset", "0");
+ final String topicName = "testTopic";
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
+ sink.open(props, context);
+
+ final int numPartitions = 3;
+ for (long i = 0; i < 30; i++) {
+ final AtomicLong ledgerId = new AtomicLong(0L);
+ final AtomicLong entryId = new AtomicLong(i);
+ final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
+ when(msg.hasIndex()).thenReturn(false);
+
+ final int partition = (int)(i % numPartitions);
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName(topicName)
+ .partition(partition)
+ .message(msg)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .schema(Schema.STRING)
+ .build();
+
+ sink.write(record);
+ }
+
+ ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = sink.pendingFlushQueue;
+ assertEquals(pendingFlushQueue.size(), 30);
+
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+ committedOffsets.put(new TopicPartition(topicName, 0),
+ new OffsetAndMetadata(3L, Optional.empty(), null));
+ committedOffsets.put(new TopicPartition(topicName, 1),
+ new OffsetAndMetadata(4L, Optional.empty(), null));
+ committedOffsets.put(new TopicPartition(topicName, 2),
+ new OffsetAndMetadata(5L, Optional.empty(), null));
+
+ Record<GenericObject> lastNotFlushed = pendingFlushQueue.peekFirst();
+ Record<GenericObject> lastNotFlushedAfter = (Record<GenericObject>) pendingFlushQueue.stream().toArray()[15];
+
+ final AtomicInteger ackedMessagesCount = new AtomicInteger(0);
+ sink.ackUntil(lastNotFlushed, committedOffsets, x -> ackedMessagesCount.incrementAndGet());
+ assertEquals(ackedMessagesCount.get(), 1);
+ assertEquals(pendingFlushQueue.size(), 29);
+
+ sink.ackUntil(lastNotFlushedAfter, committedOffsets, x -> ackedMessagesCount.incrementAndGet());
+ assertEquals(ackedMessagesCount.get(), 6);
+ assertEquals(pendingFlushQueue.size(), 24);
+
+ sink.close();
+ }
+
private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations getPojoStructWithAnnotations() {
return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations()
.setField1(1)