You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2022/06/17 17:32:04 UTC

[pulsar] branch master updated: [Fix][pulsar-io] KCA: handle kafka preCommit() returning earlier offsets than requested (#16100)

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new daf42ccc459 [Fix][pulsar-io] KCA: handle kafka preCommit() returning earlier offsets than requested (#16100)
daf42ccc459 is described below

commit daf42ccc45948f51a95a0412a5c7d80f5ccea167
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jun 17 10:31:58 2022 -0700

    [Fix][pulsar-io] KCA: handle kafka preCommit() returning earlier offsets than requested (#16100)
    
    * KCA: handle preCommit returning earlier offsets
    
    * corrected parameter name in the comments
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 77 +++++++++++++++++++---
 .../connect/PulsarKafkaConnectSinkConfig.java      |  2 +-
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 63 ++++++++++++++++++
 3 files changed, 131 insertions(+), 11 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index e1a34f64407..9dd574ffae3 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -81,7 +81,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
                     .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
                     .build());
-    private final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
+    @VisibleForTesting
+    protected final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
     private final AtomicBoolean isFlushRunning = new AtomicBoolean(false);
     private volatile boolean isRunning = false;
 
@@ -223,28 +224,84 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         }
 
         final Record<GenericObject> lastNotFlushed = pendingFlushQueue.getLast();
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = null;
         try {
             Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets();
-            Map<TopicPartition, OffsetAndMetadata> committedOffsets = task.preCommit(currentOffsets);
-            if (committedOffsets.isEmpty()) {
+            committedOffsets = task.preCommit(currentOffsets);
+            if (committedOffsets == null || committedOffsets.isEmpty()) {
                 log.info("Task returned empty committedOffsets map; skipping flush; task will retry later");
                 return;
             }
-            taskContext.flushOffsets(currentOffsets);
-            ackUntil(lastNotFlushed, Record::ack);
+            if (log.isDebugEnabled() && !areMapsEqual(committedOffsets, currentOffsets)) {
+                log.debug("committedOffsets {} differ from currentOffsets {}", committedOffsets, currentOffsets);
+            }
+            taskContext.flushOffsets(committedOffsets);
+            ackUntil(lastNotFlushed, committedOffsets, Record::ack);
             log.info("Flush succeeded");
         } catch (Throwable t) {
             log.error("error flushing pending records", t);
-            ackUntil(lastNotFlushed, Record::fail);
+            ackUntil(lastNotFlushed, committedOffsets, Record::fail);
         } finally {
             isFlushRunning.compareAndSet(true, false);
         }
     }
 
-    private void ackUntil(Record<GenericObject> lastNotFlushed, java.util.function.Consumer<Record<GenericObject>> cb) {
-        while (!pendingFlushQueue.isEmpty()) {
-            Record<GenericObject> r = pendingFlushQueue.pollFirst();
+    private static boolean areMapsEqual(Map<TopicPartition, OffsetAndMetadata> first,
+                                        Map<TopicPartition, OffsetAndMetadata> second) {
+        if (first.size() != second.size()) {
+            return false;
+        }
+
+        return first.entrySet().stream()
+                .allMatch(e -> e.getValue().equals(second.get(e.getKey())));
+    }
+
+    @VisibleForTesting
+    protected void ackUntil(Record<GenericObject> lastNotFlushed,
+                          Map<TopicPartition, OffsetAndMetadata> committedOffsets,
+                          java.util.function.Consumer<Record<GenericObject>> cb) {
+        // lastNotFlushed is needed in case of default preCommit() implementation
+        // which calls flush() and returns currentOffsets passed to it.
+        // We don't want to ack messages added to pendingFlushQueue after the preCommit/flush call
+
+        // to avoid creation of new TopicPartition for each record in pendingFlushQueue
+        Map<String, Map<Integer, Long>> topicOffsets = new HashMap<>();
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> e: committedOffsets.entrySet()) {
+            TopicPartition tp = e.getKey();
+            if (!topicOffsets.containsKey(tp.topic())) {
+                topicOffsets.put(tp.topic(), new HashMap<>());
+            }
+            Map<Integer, Long> partitionOffset = topicOffsets.get(tp.topic());
+            partitionOffset.put(tp.partition(), e.getValue().offset());
+        }
+
+        for (Record<GenericObject> r : pendingFlushQueue) {
+            final String topic = sanitizeNameIfNeeded(r.getTopicName().orElse(topicName), sanitizeTopicName);
+            final int partition = r.getPartitionIndex().orElse(0);
+
+            Long lastCommittedOffset = null;
+            if (topicOffsets.containsKey(topic)) {
+                lastCommittedOffset = topicOffsets.get(topic).get(partition);
+            }
+
+            if (lastCommittedOffset == null) {
+                if (r == lastNotFlushed) {
+                    break;
+                }
+                continue;
+            }
+
+            long offset = getMessageOffset(r);
+
+            if (offset > lastCommittedOffset) {
+                if (r == lastNotFlushed) {
+                    break;
+                }
+                continue;
+            }
+
             cb.accept(r);
+            pendingFlushQueue.remove(r);
             currentBatchSize.addAndGet(-1 * r.getMessage().get().size());
             if (r == lastNotFlushed) {
                 break;
@@ -256,7 +313,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
 
         if (sourceRecord.getMessage().isPresent()) {
             // Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
-            // Requires enableExposingBrokerEntryMetadataToClient=true on brokers.
+            // Requires exposingBrokerEntryMetadataToClientEnabled=true on brokers.
             if (useIndexAsOffset && sourceRecord.getMessage().get().hasIndex()) {
                 return sourceRecord.getMessage().get()
                         .getIndex().orElse(-1L);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index 1241f856c55..a6e8b5c966e 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -77,7 +77,7 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             defaultValue = "true",
             help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
                     + "Requires AppendIndexMetadataInterceptor and "
-                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+                    + "exposingBrokerEntryMetadataToClientEnabled=true on brokers.")
     private boolean useIndexAsOffset = true;
 
     @FieldDoc(
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 91ae86f97eb..7661e5fc98d 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -83,6 +84,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -1163,6 +1165,67 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         sink.close();
     }
 
+
+    @Test
+    public void partialPreCommitTest() throws Exception {
+        props.put("useIndexAsOffset", "false");
+        props.put("maxBatchBitsForOffset", "0");
+        final String topicName = "testTopic";
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
+        sink.open(props, context);
+
+        final int numPartitions = 3;
+        for (long i = 0; i < 30; i++) {
+            final AtomicLong ledgerId = new AtomicLong(0L);
+            final AtomicLong entryId = new AtomicLong(i);
+            final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+            Message msg = mock(MessageImpl.class);
+            when(msg.getValue()).thenReturn(rec);
+            when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
+            when(msg.hasIndex()).thenReturn(false);
+
+            final int partition = (int)(i % numPartitions);
+            final AtomicInteger status = new AtomicInteger(0);
+            Record<GenericObject> record = PulsarRecord.<String>builder()
+                    .topicName(topicName)
+                    .partition(partition)
+                    .message(msg)
+                    .ackFunction(status::incrementAndGet)
+                    .failFunction(status::decrementAndGet)
+                    .schema(Schema.STRING)
+                    .build();
+
+            sink.write(record);
+        }
+
+        ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = sink.pendingFlushQueue;
+        assertEquals(pendingFlushQueue.size(), 30);
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(new TopicPartition(topicName, 0),
+                new OffsetAndMetadata(3L, Optional.empty(), null));
+        committedOffsets.put(new TopicPartition(topicName, 1),
+                new OffsetAndMetadata(4L, Optional.empty(), null));
+        committedOffsets.put(new TopicPartition(topicName, 2),
+                new OffsetAndMetadata(5L, Optional.empty(), null));
+
+        Record<GenericObject> lastNotFlushed = pendingFlushQueue.peekFirst();
+        Record<GenericObject> lastNotFlushedAfter = (Record<GenericObject>) pendingFlushQueue.stream().toArray()[15];
+
+        final AtomicInteger ackedMessagesCount = new AtomicInteger(0);
+        sink.ackUntil(lastNotFlushed, committedOffsets, x -> ackedMessagesCount.incrementAndGet());
+        assertEquals(ackedMessagesCount.get(), 1);
+        assertEquals(pendingFlushQueue.size(), 29);
+
+        sink.ackUntil(lastNotFlushedAfter, committedOffsets, x -> ackedMessagesCount.incrementAndGet());
+        assertEquals(ackedMessagesCount.get(), 6);
+        assertEquals(pendingFlushQueue.size(), 24);
+
+        sink.close();
+    }
+
     private static PulsarSchemaToKafkaSchemaTest.StructWithAnnotations getPojoStructWithAnnotations() {
         return new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations()
                 .setField1(1)