You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/06/17 07:35:12 UTC
[pulsar] branch master updated: KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (#16098)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a18c01df761 KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (#16098)
a18c01df761 is described below
commit a18c01df7617b71f936561ac0abae62fb175f063
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jun 17 00:35:02 2022 -0700
KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (#16098)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 67 +++++++-
.../connect/PulsarKafkaConnectSinkConfig.java | 14 ++
.../io/kafka/connect/KafkaConnectSinkTest.java | 181 ++++++++++++++++++++-
3 files changed, 259 insertions(+), 3 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 31f7cbf6399..e1a34f64407 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -48,9 +48,13 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
@@ -91,6 +95,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
+ private int maxBatchBitsForOffset = 12;
+ private boolean useIndexAsOffset = true;
+
@Override
public void write(Record<GenericObject> sourceRecord) {
if (log.isDebugEnabled()) {
@@ -147,6 +154,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
+ useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
+ maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
+ Preconditions.checkArgument(maxBatchBitsForOffset <= 20,
+ "Cannot use more than 20 bits for maxBatchBitsForOffset");
+
String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass();
kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
@@ -240,6 +252,58 @@ public class KafkaConnectSink implements Sink<GenericObject> {
}
}
+ private long getMessageOffset(Record<GenericObject> sourceRecord) {
+
+ if (sourceRecord.getMessage().isPresent()) {
+ // Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
+ // Requires enableExposingBrokerEntryMetadataToClient=true on brokers.
+ if (useIndexAsOffset && sourceRecord.getMessage().get().hasIndex()) {
+ return sourceRecord.getMessage().get()
+ .getIndex().orElse(-1L);
+ }
+
+ MessageId messageId = sourceRecord.getMessage().get().getMessageId();
+ MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
+ ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
+ : messageId);
+
+ // sourceRecord.getRecordSequence() is not unique
+ // for the messages from the same batch.
+ // Special case for FunctionCommon.getSequenceId()
+ if (maxBatchBitsForOffset > 0 && msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
+ long ledgerId = batchMsgId.getLedgerId();
+ long entryId = batchMsgId.getEntryId();
+
+ if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
+ log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
+ }
+
+ int batchIdx = batchMsgId.getBatchIndex();
+
+ if (batchIdx < 0) {
+ // Should not happen unless data corruption
+ log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
+ batchIdx = 0;
+ }
+ if (batchIdx > (1 << maxBatchBitsForOffset)) {
+ log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
+ }
+ // Combine entry id and batchIdx
+ entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
+
+ // The same as FunctionCommon.getSequenceId():
+ // Combine ledger id and entry id to form offset
+ // Use less than 32 bits to represent entry id since it will get
+ // rolled over way before overflowing the max int range
+ long offset = (ledgerId << 28) | entryId;
+ return offset;
+ }
+ }
+ return sourceRecord.getRecordSequence()
+ .orElse(-1L);
+ }
+
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
@@ -282,8 +346,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
}
- long offset = sourceRecord.getRecordSequence()
- .orElse(-1L);
+ long offset = getMessageOffset(sourceRecord);
if (offset < 0) {
log.error("Message without sequenceId. Key: {} Value: {}", key, value);
throw new IllegalStateException("Message without sequenceId");
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index b3feea2ab3a..1241f856c55 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
private boolean unwrapKeyValueIfAvailable = true;
+ @FieldDoc(
+ defaultValue = "true",
+ help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+ + "Requires AppendIndexMetadataInterceptor and "
+ + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+ private boolean useIndexAsOffset = true;
+
+ @FieldDoc(
+ defaultValue = "12",
+ help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
+ + "0 to disable this behavior (Messages from the same batch will have the same "
+ + "offset which can affect some connectors.)")
+ private int maxBatchBitsForOffset = 12;
+
@FieldDoc(
defaultValue = "false",
help = "Some connectors cannot handle pulsar topic names like persistent://a/b/topic"
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 2c0ea31ac4c..91ae86f97eb 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -75,9 +76,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -91,6 +95,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -928,11 +933,16 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void offsetTest() throws Exception {
+ props.put("useIndexAsOffset", "true");
+ props.put("maxBatchBitsForOffset", "12");
+
+ final AtomicLong ledgerId = new AtomicLong(0L);
final AtomicLong entryId = new AtomicLong(0L);
final GenericRecord rec = getGenericRecord("value", Schema.STRING);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn(rec);
- when(msg.getMessageId()).then(x -> new MessageIdImpl(0, entryId.getAndIncrement(), 0));
+ when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
+ when(msg.hasIndex()).thenReturn(false);
final String topicName = "testTopic";
final int partition = 1;
@@ -959,6 +969,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
// offset is 0 for the first written record
assertEquals(sink.currentOffset(topicName, partition), 0);
+ entryId.set(1);
sink.write(record);
sink.flush();
// offset is 1 for the second written record
@@ -976,11 +987,179 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
// offset is 1 after reopening the producer
assertEquals(sink.currentOffset(topicName, partition), 1);
+ entryId.set(2);
sink.write(record);
sink.flush();
// offset is 2 for the next written record
assertEquals(sink.currentOffset(topicName, partition), 2);
+
+ // use index
+ entryId.set(999);
+ when(msg.hasIndex()).thenReturn(true);
+ when(msg.getIndex()).thenReturn(Optional.of(777L));
+
+ sink.write(record);
+ sink.flush();
+ // offset is 777 for the next written record according to index
+ assertEquals(sink.currentOffset(topicName, partition), 777);
+
+ final AtomicInteger batchIdx = new AtomicInteger(2);
+
+ entryId.set(3);
+ when(msg.getMessageId()).then(x -> new BatchMessageIdImpl(0, entryId.get(), 0, batchIdx.get()));
+ when(msg.hasIndex()).thenReturn(false);
+ sink.write(record);
+ sink.flush();
+ // offset is the batch message id includes batch
+ // (3 << 12) | 2
+ assertEquals(sink.currentOffset(topicName, partition), 12290);
+
+ // batch too large
+ batchIdx.set(Integer.MAX_VALUE);
+ sink.write(record);
+ sink.flush();
+ assertEquals(sink.currentOffset(topicName, partition), 2147483647L);
+
+ // batch too large, entryId changed,
+ // offset stays the same
+ entryId.incrementAndGet();
+ sink.write(record);
+ sink.flush();
+ assertEquals(sink.currentOffset(topicName, partition), 2147483647L);
+
+ // max usable bits for ledger: 64 - 28 used for entry + batch
+ long lastLedger = 1 << (64 - 28);
+ // max usable bits for ledger: 28 - 12 used for batch
+ long lastEntry = 1 << (28 - 12);
+ ledgerId.set(lastLedger);
+ entryId.set(lastEntry);
+ Set<Long> seenOffsets = new HashSet<>(4096);
+ // offsets are unique
+ for (int i = 0; i < 4096; i++) {
+ batchIdx.set(i);
+ sink.write(record);
+ sink.flush();
+ long offset = sink.currentOffset(topicName, partition);
+ assertFalse(seenOffsets.contains(offset));
+ seenOffsets.add(offset);
+ }
+
+ ledgerId.set(0);
+ entryId.set(0);
+ seenOffsets.clear();
+ // offsets are unique
+ for (int i = 0; i < 4096; i++) {
+ batchIdx.set(i);
+ sink.write(record);
+ sink.flush();
+ long offset = sink.currentOffset(topicName, partition);
+ assertFalse(seenOffsets.contains(offset));
+ seenOffsets.add(offset);
+ }
+
+ sink.close();
+ }
+
+ @Test
+ public void offsetNoIndexNoBatchTest() throws Exception {
+ props.put("useIndexAsOffset", "false");
+ props.put("maxBatchBitsForOffset", "0");
+
+ final AtomicLong ledgerId = new AtomicLong(0L);
+ final AtomicLong entryId = new AtomicLong(0L);
+ final GenericRecord rec = getGenericRecord("value", Schema.STRING);
+ Message msg = mock(MessageImpl.class);
+ when(msg.getValue()).thenReturn(rec);
+ when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
+ when(msg.hasIndex()).thenReturn(false);
+
+ final String topicName = "testTopic";
+ final int partition = 1;
+ final AtomicInteger status = new AtomicInteger(0);
+ Record<GenericObject> record = PulsarRecord.<String>builder()
+ .topicName(topicName)
+ .partition(partition)
+ .message(msg)
+ .ackFunction(status::incrementAndGet)
+ .failFunction(status::decrementAndGet)
+ .schema(Schema.STRING)
+ .build();
+
+ KafkaConnectSink sink = new KafkaConnectSink();
+ when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
+ sink.open(props, context);
+
+ // offset is -1 before any data is written (aka no offset)
+ assertEquals(sink.currentOffset(topicName, partition), -1L);
+
+ sink.write(record);
+ sink.flush();
+
+ // offset is 0 for the first written record
+ assertEquals(sink.currentOffset(topicName, partition), 0);
+
+ entryId.set(1);
+ sink.write(record);
+ sink.flush();
+ // offset is 1 for the second written record
+ assertEquals(sink.currentOffset(topicName, partition), 1);
+
+ sink.close();
+
+ // close the producer, open again
+ sink = new KafkaConnectSink();
+ when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+ .serviceUrl(brokerUrl.toString())
+ .build());
+ sink.open(props, context);
+
+ // offset is 1 after reopening the producer
+ assertEquals(sink.currentOffset(topicName, partition), 1);
+
+ entryId.set(2);
+ sink.write(record);
+ sink.flush();
+ // offset is 2 for the next written record
+ assertEquals(sink.currentOffset(topicName, partition), 2);
+
+ // use of index is disabled
+ entryId.set(999);
+ when(msg.hasIndex()).thenReturn(true);
+ when(msg.getIndex()).thenReturn(Optional.of(777L));
+
+ sink.write(record);
+ sink.flush();
+ // offset is 999 for the next written record, index is disabled
+ assertEquals(sink.currentOffset(topicName, partition), 999);
+
+ final AtomicInteger batchIdx = new AtomicInteger(2);
+
+ entryId.set(3);
+ when(msg.getMessageId()).then(x -> new BatchMessageIdImpl(0, entryId.get(), 0, batchIdx.get()));
+ when(msg.hasIndex()).thenReturn(false);
+ sink.write(record);
+ sink.flush();
+ // offset does not includes batch - it disabled
+ assertEquals(sink.currentOffset(topicName, partition), 3);
+
+ // max usable bits for ledger: 64 - 28 used for entry + batch
+ long lastLedger = 1 << (64 - 28);
+ // max usable bits for ledger: 28 - 12 used for batch
+ long lastEntry = 1 << (28 - 12);
+ ledgerId.set(lastLedger);
+ entryId.set(lastEntry);
+ Set<Long> seenOffsets = new HashSet<>(4096);
+ // offsets are not unique
+ for (int i = 0; i < 4096; i++) {
+ batchIdx.set(i);
+ sink.write(record);
+ sink.flush();
+ long offset = sink.currentOffset(topicName, partition);
+ seenOffsets.add(offset);
+ }
+ assertEquals(seenOffsets.size(), 1);
+
sink.close();
}