You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/26 15:13:49 UTC

[pulsar] branch branch-2.11 updated: [fix][connector] KCA: use reflection to get pulsar-client impl classes (#17835)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 11ec959c454 [fix][connector] KCA: use reflection to get pulsar-client impl classes (#17835)
11ec959c454 is described below

commit 11ec959c4546e824aea50d4bb1ba6bd8bac7ee26
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Sep 26 12:29:52 2022 +0200

    [fix][connector] KCA: use reflection to get pulsar-client impl classes (#17835)
    
    (cherry picked from commit 4c22159f5a972e7a92382b9a90c6c70c43c5d166)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 106 ++++++++++++++-------
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  33 +++++++
 2 files changed, 105 insertions(+), 34 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 9dd574ffae3..719642edf13 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -26,6 +26,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +40,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -52,9 +55,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
@@ -320,47 +320,85 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             }
 
             MessageId messageId = sourceRecord.getMessage().get().getMessageId();
-            MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
-                    ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
-                    : messageId);
-
             // sourceRecord.getRecordSequence() is not unique
             // for the messages from the same batch.
             // Special case for FunctionCommon.getSequenceId()
-            if (maxBatchBitsForOffset > 0 && msgId instanceof BatchMessageIdImpl) {
-                BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
-                long ledgerId = batchMsgId.getLedgerId();
-                long entryId = batchMsgId.getEntryId();
-
-                if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
-                    log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
-                }
-
-                int batchIdx = batchMsgId.getBatchIndex();
-
-                if (batchIdx < 0) {
-                    // Should not happen unless data corruption
-                    log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
-                    batchIdx = 0;
+            if (maxBatchBitsForOffset > 0) {
+                BatchMessageSequenceRef messageSequenceRef = getMessageSequenceRefForBatchMessage(messageId);
+                if (messageSequenceRef != null) {
+                    long ledgerId = messageSequenceRef.getLedgerId();
+                    long entryId = messageSequenceRef.getEntryId();
+
+                    if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
+                        log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
+                    }
+                    int batchIdx = messageSequenceRef.getBatchIdx();
+
+                    if (batchIdx < 0) {
+                        // Should not happen unless data corruption
+                        log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
+                        batchIdx = 0;
+                    }
+                    if (batchIdx > (1 << maxBatchBitsForOffset)) {
+                        log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
+                    }
+                    // Combine entry id and batchIdx
+                    entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
+
+                    // The same as FunctionCommon.getSequenceId():
+                    // Combine ledger id and entry id to form offset
+                    // Use less than 32 bits to represent entry id since it will get
+                    // rolled over way before overflowing the max int range
+                    long offset = (ledgerId << 28) | entryId;
+                    return offset;
                 }
-                if (batchIdx > (1 << maxBatchBitsForOffset)) {
-                    log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
-                }
-                // Combine entry id and batchIdx
-                entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
-
-                // The same as FunctionCommon.getSequenceId():
-                // Combine ledger id and entry id to form offset
-                // Use less than 32 bits to represent entry id since it will get
-                // rolled over way before overflowing the max int range
-                long offset = (ledgerId << 28) | entryId;
-                return offset;
             }
         }
         return sourceRecord.getRecordSequence()
                 .orElse(-1L);
     }
 
+    @Getter
+    @AllArgsConstructor
+    static class BatchMessageSequenceRef {
+        long ledgerId;
+        long entryId;
+        int batchIdx;
+    }
+
+    @VisibleForTesting
+    static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId messageId) {
+        long ledgerId;
+        long entryId;
+        int batchIdx;
+        try {
+            try {
+                messageId = (MessageId) messageId.getClass().getDeclaredMethod("getInnerMessageId").invoke(messageId);
+            } catch (NoSuchMethodException noSuchMethodException) {
+                // not a TopicMessageIdImpl
+            }
+
+            try {
+                batchIdx = (int) messageId.getClass().getDeclaredMethod("getBatchIndex").invoke(messageId);
+            } catch (NoSuchMethodException noSuchMethodException) {
+                // not a BatchMessageIdImpl, returning null to use the standard sequenceId
+                return null;
+            }
+
+            // if getBatchIndex exists it means messageId is a 'BatchMessageIdImpl' instance.
+            final Class<?> messageIdImplClass = messageId.getClass().getSuperclass();
+
+            ledgerId = (long) messageIdImplClass.getDeclaredMethod("getLedgerId").invoke(messageId);
+            entryId = (long) messageIdImplClass.getDeclaredMethod("getEntryId").invoke(messageId);
+        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
+            log.error("Unexpected error while retrieving sequenceId, messageId class: {}, error: {}",
+                    messageId.getClass().getName(), ex.getMessage(), ex);
+            throw new RuntimeException(ex);
+        }
+
+        return new BatchMessageSequenceRef(ledgerId, entryId, batchIdx);
+    }
+
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 4f3996c8fde..0c260b4eb65 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -111,6 +112,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -1432,6 +1434,37 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
         return rec;
     }
 
+    @Test
+    public void testGetMessageSequenceRefForBatchMessage() throws Exception {
+        long ledgerId = 123L;
+        long entryId = Long.MAX_VALUE;
+        int batchIdx = 16;
+
+        KafkaConnectSink.BatchMessageSequenceRef ref = KafkaConnectSink
+                .getMessageSequenceRefForBatchMessage(new MessageIdImpl(ledgerId, entryId, 0));
+        assertNull(ref);
+
+        ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
+                        new TopicMessageIdImpl("topic-0", "topic", new MessageIdImpl(ledgerId, entryId, 0))
+        );
+        assertNull(ref);
+
+        ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
+                new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx));
+
+        assertEquals(ref.getLedgerId(), ledgerId);
+        assertEquals(ref.getEntryId(), entryId);
+        assertEquals(ref.getBatchIdx(), batchIdx);
+
+        ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
+                new TopicMessageIdImpl("topic-0", "topic", new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx))
+        );
+
+        assertEquals(ref.getLedgerId(), ledgerId);
+        assertEquals(ref.getEntryId(), entryId);
+        assertEquals(ref.getBatchIdx(), batchIdx);
+    }
+
     @SneakyThrows
     private java.util.Date getDateFromString(String dateInString) {
         SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");