You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/17 07:35:12 UTC

[pulsar] branch master updated: KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (#16098)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a18c01df761 KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (#16098)
a18c01df761 is described below

commit a18c01df7617b71f936561ac0abae62fb175f063
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jun 17 00:35:02 2022 -0700

    KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (#16098)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  67 +++++++-
 .../connect/PulsarKafkaConnectSinkConfig.java      |  14 ++
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 181 ++++++++++++++++++++-
 3 files changed, 259 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 31f7cbf6399..e1a34f64407 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -48,9 +48,13 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
@@ -91,6 +95,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             CacheBuilder.newBuilder().maximumSize(1000)
                     .expireAfterAccess(30, TimeUnit.MINUTES).build();
 
+    private int maxBatchBitsForOffset = 12;
+    private boolean useIndexAsOffset = true;
+
     @Override
     public void write(Record<GenericObject> sourceRecord) {
         if (log.isDebugEnabled()) {
@@ -147,6 +154,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
         sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
 
+        useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
+        maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
+        Preconditions.checkArgument(maxBatchBitsForOffset <= 20,
+                "Cannot use more than 20 bits for maxBatchBitsForOffset");
+
         String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass();
         kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
 
@@ -240,6 +252,58 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         }
     }
 
+    private long getMessageOffset(Record<GenericObject> sourceRecord) {
+
+        if (sourceRecord.getMessage().isPresent()) {
+            // Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
+            // Requires enableExposingBrokerEntryMetadataToClient=true on brokers.
+            if (useIndexAsOffset && sourceRecord.getMessage().get().hasIndex()) {
+                return sourceRecord.getMessage().get()
+                        .getIndex().orElse(-1L);
+            }
+
+            MessageId messageId = sourceRecord.getMessage().get().getMessageId();
+            MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
+                    ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
+                    : messageId);
+
+            // sourceRecord.getRecordSequence() is not unique
+            // for the messages from the same batch.
+            // Special case for FunctionCommon.getSequenceId()
+            if (maxBatchBitsForOffset > 0 && msgId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
+                long ledgerId = batchMsgId.getLedgerId();
+                long entryId = batchMsgId.getEntryId();
+
+                if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
+                    log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
+                }
+
+                int batchIdx = batchMsgId.getBatchIndex();
+
+                if (batchIdx < 0) {
+                    // Should not happen unless data corruption
+                    log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
+                    batchIdx = 0;
+                }
+                if (batchIdx > (1 << maxBatchBitsForOffset)) {
+                    log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
+                }
+                // Combine entry id and batchIdx
+                entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
+
+                // The same as FunctionCommon.getSequenceId():
+                // Combine ledger id and entry id to form offset
+                // Use less than 32 bits to represent entry id since it will get
+                // rolled over way before overflowing the max int range
+                long offset = (ledgerId << 28) | entryId;
+                return offset;
+            }
+        }
+        return sourceRecord.getRecordSequence()
+                .orElse(-1L);
+    }
+
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
@@ -282,8 +346,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
         }
 
-        long offset = sourceRecord.getRecordSequence()
-                .orElse(-1L);
+        long offset = getMessageOffset(sourceRecord);
         if (offset < 0) {
             log.error("Message without sequenceId. Key: {} Value: {}", key, value);
             throw new IllegalStateException("Message without sequenceId");
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index b3feea2ab3a..1241f856c55 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
     private boolean unwrapKeyValueIfAvailable = true;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+                    + "Requires AppendIndexMetadataInterceptor and "
+                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+    private boolean useIndexAsOffset = true;
+
+    @FieldDoc(
+            defaultValue = "12",
+            help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
+                    + "0 to disable this behavior (Messages from the same batch will have the same "
+                    + "offset which can affect some connectors.)")
+    private int maxBatchBitsForOffset = 12;
+
     @FieldDoc(
             defaultValue = "false",
             help = "Some connectors cannot handle pulsar topic names like persistent://a/b/topic"
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 2c0ea31ac4c..91ae86f97eb 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -75,9 +76,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.AbstractMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -91,6 +95,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -928,11 +933,16 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     @Test
     public void offsetTest() throws Exception {
+        props.put("useIndexAsOffset", "true");
+        props.put("maxBatchBitsForOffset", "12");
+
+        final AtomicLong ledgerId = new AtomicLong(0L);
         final AtomicLong entryId = new AtomicLong(0L);
         final GenericRecord rec = getGenericRecord("value", Schema.STRING);
         Message msg = mock(MessageImpl.class);
         when(msg.getValue()).thenReturn(rec);
-        when(msg.getMessageId()).then(x -> new MessageIdImpl(0, entryId.getAndIncrement(), 0));
+        when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
+        when(msg.hasIndex()).thenReturn(false);
 
         final String topicName = "testTopic";
         final int partition = 1;
@@ -959,6 +969,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         // offset is 0 for the first written record
         assertEquals(sink.currentOffset(topicName, partition), 0);
 
+        entryId.set(1);
         sink.write(record);
         sink.flush();
         // offset is 1 for the second written record
@@ -976,11 +987,179 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         // offset is 1 after reopening the producer
         assertEquals(sink.currentOffset(topicName, partition), 1);
 
+        entryId.set(2);
         sink.write(record);
         sink.flush();
         // offset is 2 for the next written record
         assertEquals(sink.currentOffset(topicName, partition), 2);
 
+
+        // use index
+        entryId.set(999);
+        when(msg.hasIndex()).thenReturn(true);
+        when(msg.getIndex()).thenReturn(Optional.of(777L));
+
+        sink.write(record);
+        sink.flush();
+        // offset is 777 for the next written record according to index
+        assertEquals(sink.currentOffset(topicName, partition), 777);
+
+        final AtomicInteger batchIdx = new AtomicInteger(2);
+
+        entryId.set(3);
+        when(msg.getMessageId()).then(x -> new BatchMessageIdImpl(0, entryId.get(), 0, batchIdx.get()));
+        when(msg.hasIndex()).thenReturn(false);
+        sink.write(record);
+        sink.flush();
+        // offset is the batch message id includes batch
+        // (3 << 12) | 2
+        assertEquals(sink.currentOffset(topicName, partition), 12290);
+
+        // batch too large
+        batchIdx.set(Integer.MAX_VALUE);
+        sink.write(record);
+        sink.flush();
+        assertEquals(sink.currentOffset(topicName, partition), 2147483647L);
+
+        // batch too large, entryId changed,
+        // offset stays the same
+        entryId.incrementAndGet();
+        sink.write(record);
+        sink.flush();
+        assertEquals(sink.currentOffset(topicName, partition), 2147483647L);
+
+        // max usable bits for ledger: 64 - 28 used for entry + batch
+        long lastLedger = 1 << (64 - 28);
+        // max usable bits for ledger: 28 - 12 used for batch
+        long lastEntry = 1 << (28 - 12);
+        ledgerId.set(lastLedger);
+        entryId.set(lastEntry);
+        Set<Long> seenOffsets = new HashSet<>(4096);
+        // offsets are unique
+        for (int i = 0; i < 4096; i++) {
+            batchIdx.set(i);
+            sink.write(record);
+            sink.flush();
+            long offset = sink.currentOffset(topicName, partition);
+            assertFalse(seenOffsets.contains(offset));
+            seenOffsets.add(offset);
+        }
+
+        ledgerId.set(0);
+        entryId.set(0);
+        seenOffsets.clear();
+        // offsets are unique
+        for (int i = 0; i < 4096; i++) {
+            batchIdx.set(i);
+            sink.write(record);
+            sink.flush();
+            long offset = sink.currentOffset(topicName, partition);
+            assertFalse(seenOffsets.contains(offset));
+            seenOffsets.add(offset);
+        }
+
+        sink.close();
+    }
+
+    @Test
+    public void offsetNoIndexNoBatchTest() throws Exception {
+        props.put("useIndexAsOffset", "false");
+        props.put("maxBatchBitsForOffset", "0");
+
+        final AtomicLong ledgerId = new AtomicLong(0L);
+        final AtomicLong entryId = new AtomicLong(0L);
+        final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+        Message msg = mock(MessageImpl.class);
+        when(msg.getValue()).thenReturn(rec);
+        when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
+        when(msg.hasIndex()).thenReturn(false);
+
+        final String topicName = "testTopic";
+        final int partition = 1;
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName(topicName)
+                .partition(partition)
+                .message(msg)
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .schema(Schema.STRING)
+                .build();
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+        when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
+        sink.open(props, context);
+
+        // offset is -1 before any data is written (aka no offset)
+        assertEquals(sink.currentOffset(topicName, partition), -1L);
+
+        sink.write(record);
+        sink.flush();
+
+        // offset is 0 for the first written record
+        assertEquals(sink.currentOffset(topicName, partition), 0);
+
+        entryId.set(1);
+        sink.write(record);
+        sink.flush();
+        // offset is 1 for the second written record
+        assertEquals(sink.currentOffset(topicName, partition), 1);
+
+        sink.close();
+
+        // close the producer, open again
+        sink = new KafkaConnectSink();
+        when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build());
+        sink.open(props, context);
+
+        // offset is 1 after reopening the producer
+        assertEquals(sink.currentOffset(topicName, partition), 1);
+
+        entryId.set(2);
+        sink.write(record);
+        sink.flush();
+        // offset is 2 for the next written record
+        assertEquals(sink.currentOffset(topicName, partition), 2);
+
+        // use of index is disabled
+        entryId.set(999);
+        when(msg.hasIndex()).thenReturn(true);
+        when(msg.getIndex()).thenReturn(Optional.of(777L));
+
+        sink.write(record);
+        sink.flush();
+        // offset is 999 for the next written record, index is disabled
+        assertEquals(sink.currentOffset(topicName, partition), 999);
+
+        final AtomicInteger batchIdx = new AtomicInteger(2);
+
+        entryId.set(3);
+        when(msg.getMessageId()).then(x -> new BatchMessageIdImpl(0, entryId.get(), 0, batchIdx.get()));
+        when(msg.hasIndex()).thenReturn(false);
+        sink.write(record);
+        sink.flush();
+        // offset does not includes batch - it disabled
+        assertEquals(sink.currentOffset(topicName, partition), 3);
+
+        // max usable bits for ledger: 64 - 28 used for entry + batch
+        long lastLedger = 1 << (64 - 28);
+        // max usable bits for ledger: 28 - 12 used for batch
+        long lastEntry = 1 << (28 - 12);
+        ledgerId.set(lastLedger);
+        entryId.set(lastEntry);
+        Set<Long> seenOffsets = new HashSet<>(4096);
+        // offsets are not unique
+        for (int i = 0; i < 4096; i++) {
+            batchIdx.set(i);
+            sink.write(record);
+            sink.flush();
+            long offset = sink.currentOffset(topicName, partition);
+            seenOffsets.add(offset);
+        }
+        assertEquals(seenOffsets.size(), 1);
+
         sink.close();
     }