You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/26 15:13:49 UTC
[pulsar] branch branch-2.11 updated: [fix][connector] KCA: use reflection to get pulsar-client impl classes (#17835)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 11ec959c454 [fix][connector] KCA: use reflection to get pulsar-client impl classes (#17835)
11ec959c454 is described below
commit 11ec959c4546e824aea50d4bb1ba6bd8bac7ee26
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Sep 26 12:29:52 2022 +0200
[fix][connector] KCA: use reflection to get pulsar-client impl classes (#17835)
(cherry picked from commit 4c22159f5a972e7a92382b9a90c6c70c43c5d166)
---
.../pulsar/io/kafka/connect/KafkaConnectSink.java | 106 ++++++++++++++-------
.../io/kafka/connect/KafkaConnectSinkTest.java | 33 +++++++
2 files changed, 105 insertions(+), 34 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 9dd574ffae3..719642edf13 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -26,6 +26,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -52,9 +55,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
@@ -320,47 +320,85 @@ public class KafkaConnectSink implements Sink<GenericObject> {
}
MessageId messageId = sourceRecord.getMessage().get().getMessageId();
- MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
- ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
- : messageId);
-
// sourceRecord.getRecordSequence() is not unique
// for the messages from the same batch.
// Special case for FunctionCommon.getSequenceId()
- if (maxBatchBitsForOffset > 0 && msgId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
- long ledgerId = batchMsgId.getLedgerId();
- long entryId = batchMsgId.getEntryId();
-
- if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
- log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
- }
-
- int batchIdx = batchMsgId.getBatchIndex();
-
- if (batchIdx < 0) {
- // Should not happen unless data corruption
- log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
- batchIdx = 0;
+ if (maxBatchBitsForOffset > 0) {
+ BatchMessageSequenceRef messageSequenceRef = getMessageSequenceRefForBatchMessage(messageId);
+ if (messageSequenceRef != null) {
+ long ledgerId = messageSequenceRef.getLedgerId();
+ long entryId = messageSequenceRef.getEntryId();
+
+ if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
+ log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
+ }
+ int batchIdx = messageSequenceRef.getBatchIdx();
+
+ if (batchIdx < 0) {
+ // Should not happen unless data corruption
+ log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
+ batchIdx = 0;
+ }
+ if (batchIdx > (1 << maxBatchBitsForOffset)) {
+ log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
+ }
+ // Combine entry id and batchIdx
+ entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
+
+ // The same as FunctionCommon.getSequenceId():
+ // Combine ledger id and entry id to form offset
+ // Use less than 32 bits to represent entry id since it will get
+ // rolled over way before overflowing the max int range
+ long offset = (ledgerId << 28) | entryId;
+ return offset;
}
- if (batchIdx > (1 << maxBatchBitsForOffset)) {
- log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
- }
- // Combine entry id and batchIdx
- entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
-
- // The same as FunctionCommon.getSequenceId():
- // Combine ledger id and entry id to form offset
- // Use less than 32 bits to represent entry id since it will get
- // rolled over way before overflowing the max int range
- long offset = (ledgerId << 28) | entryId;
- return offset;
}
}
return sourceRecord.getRecordSequence()
.orElse(-1L);
}
+ @Getter
+ @AllArgsConstructor
+ static class BatchMessageSequenceRef {
+ long ledgerId;
+ long entryId;
+ int batchIdx;
+ }
+
+ @VisibleForTesting
+ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId messageId) {
+ long ledgerId;
+ long entryId;
+ int batchIdx;
+ try {
+ try {
+ messageId = (MessageId) messageId.getClass().getDeclaredMethod("getInnerMessageId").invoke(messageId);
+ } catch (NoSuchMethodException noSuchMethodException) {
+ // not a TopicMessageIdImpl
+ }
+
+ try {
+ batchIdx = (int) messageId.getClass().getDeclaredMethod("getBatchIndex").invoke(messageId);
+ } catch (NoSuchMethodException noSuchMethodException) {
+ // not a BatchMessageIdImpl, returning null to use the standard sequenceId
+ return null;
+ }
+
+ // if getBatchIndex exists it means messageId is a 'BatchMessageIdImpl' instance.
+ final Class<?> messageIdImplClass = messageId.getClass().getSuperclass();
+
+ ledgerId = (long) messageIdImplClass.getDeclaredMethod("getLedgerId").invoke(messageId);
+ entryId = (long) messageIdImplClass.getDeclaredMethod("getEntryId").invoke(messageId);
+ } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
+ log.error("Unexpected error while retrieving sequenceId, messageId class: {}, error: {}",
+ messageId.getClass().getName(), ex.getMessage(), ex);
+ throw new RuntimeException(ex);
+ }
+
+ return new BatchMessageSequenceRef(ledgerId, entryId, batchIdx);
+ }
+
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 4f3996c8fde..0c260b4eb65 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -111,6 +112,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -1432,6 +1434,37 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
return rec;
}
+ @Test
+ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
+ long ledgerId = 123L;
+ long entryId = Long.MAX_VALUE;
+ int batchIdx = 16;
+
+ KafkaConnectSink.BatchMessageSequenceRef ref = KafkaConnectSink
+ .getMessageSequenceRefForBatchMessage(new MessageIdImpl(ledgerId, entryId, 0));
+ assertNull(ref);
+
+ ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
+ new TopicMessageIdImpl("topic-0", "topic", new MessageIdImpl(ledgerId, entryId, 0))
+ );
+ assertNull(ref);
+
+ ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
+ new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx));
+
+ assertEquals(ref.getLedgerId(), ledgerId);
+ assertEquals(ref.getEntryId(), entryId);
+ assertEquals(ref.getBatchIdx(), batchIdx);
+
+ ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
+ new TopicMessageIdImpl("topic-0", "topic", new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx))
+ );
+
+ assertEquals(ref.getLedgerId(), ledgerId);
+ assertEquals(ref.getEntryId(), entryId);
+ assertEquals(ref.getBatchIdx(), batchIdx);
+ }
+
@SneakyThrows
private java.util.Date getDateFromString(String dateInString) {
SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");