You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/18 08:09:56 UTC

[pulsar-adapters] branch master updated: Fixed integration tests (#22)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
     new 9801c43  Fixed integration tests (#22)
9801c43 is described below

commit 9801c43898f1b31c6335382c5ea16eac200b603d
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jun 18 01:09:50 2021 -0700

    Fixed integration tests (#22)
    
    ### Motivation
    
    KafkaApiTest is failing
    
    ### Modifications
    
    1. conversion between Pulsar topic name and Kafka TopicPartition ended up with TopicPartition using name with "-partition-<partition idx>"
    
    2. Seek was not working correctly:
    
    PulsarKafkaConsumer seeks to beginning, as asked.
    Clears lastReceivedOffset in the process.
    
    on poll it checks
    ```
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                    	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                    	resetOffsets(tp);
                }
    ```
    seek didn't update unpolledPartitions - reset offset uses default strategy to reset => seeks to the end
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    *(Please pick either of the following options)*
    
    This change is already covered by existing tests, such as KafkaApiTest.
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
    No
    
    ### Documentation
    
      - Does this pull request introduce a new feature? NO
---
 .../clients/consumer/PulsarKafkaConsumer.java      | 46 ++++++++++++++--------
 .../test/KafkaProducerSimpleConsumerTest.java      |  2 +
 .../spark/SparkStreamingPulsarReceiverTest.java    | 23 +++++------
 3 files changed, 44 insertions(+), 27 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 9ef4e96..ce9b9d2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.time.Duration;
 import java.util.AbstractMap;
@@ -59,7 +58,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -277,9 +275,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                         CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                 .topic(partitionName).subscribeAsync();
                         int partitionIndex = i;
-                        TopicPartition tp = new TopicPartition(
-                            TopicName.get(topic).getLocalName(),
-                            partitionIndex);
+                        TopicPartition tp = normalizedTopicPartition(topic, partitionIndex);
                         futures.add(future.thenApply(consumer -> {
                             log.info("Add consumer {} for partition {}", consumer, tp);
                             consumers.putIfAbsent(tp, consumer);
@@ -291,9 +287,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                     // Topic has a single partition
                     CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                             .subscribeAsync();
-                    TopicPartition tp = new TopicPartition(
-                        TopicName.get(topic).getLocalName(),
-                        0);
+                    TopicPartition tp = normalizedTopicPartition(topic, 0);
                     futures.add(future.thenApply(consumer -> {
                         log.info("Add consumer {} for partition {}", consumer, tp);
                         consumers.putIfAbsent(tp, consumer);
@@ -327,6 +321,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         }
     }
 
+    private TopicPartition normalizedTopicPartition(TopicPartition tp) {
+        return normalizedTopicPartition(tp.topic(), tp.partition());
+    }
+
+    private TopicPartition normalizedTopicPartition(String topic, int partition) {
+        String name = TopicName.get(topic).getPartitionedTopicName();
+        return new TopicPartition(name, partition);
+    }
+
     @Override
     public void assign(Collection<TopicPartition> partitions) {
         Set<String> topics = partitions.stream().map(p -> p.topic()).collect(Collectors.toSet());
@@ -372,7 +375,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
             while (item != null) {
                 TopicName topicName = TopicName.get(item.consumer.getTopic());
-                String topic = topicName.getLocalName();
+                String topic = topicName.getPartitionedTopicName();
                 int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                 Message<byte[]> msg = item.message;
                 MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
@@ -504,12 +507,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        Preconditions.checkNotNull(offsets);
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
         applyConsumerInterceptorsOnCommit(interceptors, offsets);
-        offsets.forEach((topicPartition, offsetAndMetadata) -> {
+        offsets.forEach((tp, offsetAndMetadata) -> {
+            TopicPartition topicPartition = normalizedTopicPartition(tp);
             org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
-            lastCommittedOffset.put(topicPartition, offsetAndMetadata);
+
+            lastCommittedOffset.put(tp, offsetAndMetadata);
             futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
         });
 
@@ -566,7 +572,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     @Override
     public void seek(TopicPartition partition, long offset) {
         MessageId msgId = MessageIdUtils.getMessageId(offset);
-        org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
+        TopicPartition topicPartition = normalizedTopicPartition(partition);
+        org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
         if (c == null) {
             throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
         }
@@ -594,12 +601,14 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         lastReceivedOffset.clear();
         
         for (TopicPartition tp : partitions) {
-            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+            TopicPartition normalizedTp = normalizedTopicPartition(tp);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
             if (c == null) {
                 futures.add(FutureUtil.failedFuture(
                         new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
             } else {
                 futures.add(c.seekAsync(MessageId.earliest));
+                unpolledPartitions.add(tp);
             }
         }
 
@@ -617,12 +626,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         lastReceivedOffset.clear();
 
         for (TopicPartition tp : partitions) {
-            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+            TopicPartition normalizedTp = normalizedTopicPartition(tp);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(normalizedTp);
+
             if (c == null) {
                 futures.add(FutureUtil.failedFuture(
                         new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
             } else {
                 futures.add(c.seekAsync(MessageId.latest));
+                unpolledPartitions.add(tp);
             }
         }
 
@@ -712,7 +724,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     @Override
     public void pause(Collection<TopicPartition> partitions) {
         partitions.forEach(p -> {
-            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+            TopicPartition topicPartition = normalizedTopicPartition(p);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
             if (c != null) {
                 c.pause();
             }
@@ -722,7 +735,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     @Override
     public void resume(Collection<TopicPartition> partitions) {
         partitions.forEach(p -> {
-            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(p);
+            TopicPartition topicPartition = normalizedTopicPartition(p);
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(topicPartition);
             if (c != null) {
                 c.resume();
             }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
index 2177021..c2a024b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/test/java/org/apache/pulsar/client/kafka/test/KafkaProducerSimpleConsumerTest.java
@@ -141,6 +141,7 @@ public class KafkaProducerSimpleConsumerTest extends ProducerConsumerBase {
             producer.send(message);
         }
         producer.close();
+        Thread.sleep(500);
 
         // (2) Consume using simple consumer
         PulsarKafkaSimpleConsumer consumer = new PulsarKafkaSimpleConsumer(serviceUrl, 0, 0, 0, "clientId");
@@ -158,6 +159,7 @@ public class KafkaProducerSimpleConsumerTest extends ProducerConsumerBase {
                 .build();
         FetchResponse fetchResponse = consumer.fetch(fReq);
 
+        Thread.sleep(500);
         long lastOffset = 0;
         MessageId offset = null;
         for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) {
diff --git a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
index bd619a5..443514c 100644
--- a/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
+++ b/tests/pulsar-spark-test/src/test/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiverTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.client.api.Consumer;
@@ -48,7 +49,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
     private static final String EXPECTED_MESSAGE = "pulsar-spark test message";
 
     @Test(dataProvider = "ServiceUrls")
-    public void testReceivedMessage(String serviceUrl) throws Exception {
+    public void testReceivedMessage(Supplier<String> serviceUrl) throws Exception {
         ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
 
         Set<String> set = new HashSet<>();
@@ -68,14 +69,14 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
         consConf.setMessageListener(msgListener);
 
         SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
-            serviceUrl,
+            serviceUrl.get(),
             consConf,
             new AuthenticationDisabled());
 
         receiver.onStart();
         waitForTransmission();
 
-        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
         Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
         producer.send(EXPECTED_MESSAGE.getBytes());
 
@@ -85,7 +86,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
     }
 
     @Test(dataProvider = "ServiceUrls")
-    public void testDefaultSettingsOfReceiver(String serviceUrl) {
+    public void testDefaultSettingsOfReceiver(Supplier<String> serviceUrl) {
         ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
 
         Set<String> set = new HashSet<>();
@@ -94,7 +95,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
         consConf.setSubscriptionName(SUBS);
 
         SparkStreamingPulsarReceiver receiver = new SparkStreamingPulsarReceiver(
-            serviceUrl,
+            serviceUrl.get(),
             consConf,
             new AuthenticationDisabled());
 
@@ -103,7 +104,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
     }
 
     @Test(dataProvider = "ServiceUrls")
-    public void testSharedSubscription(String serviceUrl) throws Exception {
+    public void testSharedSubscription(Supplier<String> serviceUrl) throws Exception {
         ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
 
         Set<String> set = new HashSet<>();
@@ -120,12 +121,12 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
         });
 
         SparkStreamingPulsarReceiver receiver1 = new SparkStreamingPulsarReceiver(
-            serviceUrl,
+            serviceUrl.get(),
             consConf,
             new AuthenticationDisabled());
 
         SparkStreamingPulsarReceiver receiver2 = new SparkStreamingPulsarReceiver(
-                serviceUrl,
+                serviceUrl.get(),
                 consConf,
                 new AuthenticationDisabled());
 
@@ -133,7 +134,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
         receiver2.onStart();
         waitForTransmission();
 
-        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build();
         Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
         for (int i = 0; i < 10; i++) {
             producer.send(EXPECTED_MESSAGE.getBytes());
@@ -149,8 +150,8 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
     @Test(expectedExceptions = NullPointerException.class,
             expectedExceptionsMessageRegExp = "ConsumerConfigurationData must not be null",
             dataProvider = "ServiceUrls")
-    public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
-        new SparkStreamingPulsarReceiver(serviceUrl, null, new AuthenticationDisabled());
+    public void testReceiverWhenClientConfigurationIsNull(Supplier<String> serviceUrl) {
+        new SparkStreamingPulsarReceiver(serviceUrl.get(), null, new AuthenticationDisabled());
     }
 
     private static void waitForTransmission() {