You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/14 06:02:15 UTC

[flink] 02/06: [FLINK-27388][Connector/pulsar] Change the topic setup logic in Pulsar runtime operator.

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e09a691dbd67457149558783a4597aa4c4821e99
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Mon Sep 5 23:51:10 2022 +0800

    [FLINK-27388][Connector/pulsar] Change the topic setup logic in Pulsar runtime operator.
---
 .../testutils/runtime/PulsarRuntimeOperator.java   | 155 ++++++++-------------
 1 file changed, 58 insertions(+), 97 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index be387c583b1..1fe98128e3d 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -53,7 +54,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
@@ -76,7 +76,8 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WR
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
+import static org.apache.pulsar.client.api.MessageId.earliest;
+import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
 import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
 import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
 import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
@@ -95,8 +96,6 @@ public class PulsarRuntimeOperator implements Closeable {
     private final String adminUrl;
     private final PulsarClient client;
     private final PulsarAdmin admin;
-    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Producer<?>>> producers;
-    private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Consumer<?>>> consumers;
 
     public PulsarRuntimeOperator(String serviceUrl, String adminUrl) {
         this(serviceUrl, serviceUrl, adminUrl, adminUrl);
@@ -117,8 +116,6 @@ public class PulsarRuntimeOperator implements Closeable {
                                         .enableTransaction(true)
                                         .build());
         this.admin = sneakyClient(() -> PulsarAdmin.builder().serviceHttpUrl(adminUrl).build());
-        this.producers = new ConcurrentHashMap<>();
-        this.consumers = new ConcurrentHashMap<>();
     }
 
     /**
@@ -169,7 +166,8 @@ public class PulsarRuntimeOperator implements Closeable {
     }
 
     /**
-     * Create a pulsar topic with given partition number.
+     * Create a pulsar topic with given partition number if the topic doesn't exist. We won't do
+     * anything for the existing topic. Make sure correctly used in the testing code.
      *
      * @param topic The name of the topic.
      * @param numberOfPartitions The number of partitions. We would create a non-partitioned topic
@@ -219,10 +217,6 @@ public class PulsarRuntimeOperator implements Closeable {
             return;
         }
 
-        // Close all the available consumers and producers.
-        removeConsumers(topic);
-        removeProducers(topic);
-
         if (metadata.partitions == NON_PARTITIONED) {
             sneakyAdmin(() -> admin().topics().delete(topicName));
         } else {
@@ -305,8 +299,8 @@ public class PulsarRuntimeOperator implements Closeable {
      */
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, String key, Collection<T> messages) {
+        Producer<T> producer = createProducer(topic, schema);
         try {
-            Producer<T> producer = createProducer(topic, schema);
             List<MessageId> messageIds = new ArrayList<>(messages.size());
 
             for (T message : messages) {
@@ -322,6 +316,15 @@ public class PulsarRuntimeOperator implements Closeable {
         } catch (PulsarClientException e) {
             sneakyThrow(e);
             return emptyList();
+        } finally {
+            try {
+                // Waiting for all the pending messages be sent to the Pulsar.
+                producer.flush();
+                // Directly close without the flush will drop all the pending messages.
+                producer.close();
+            } catch (PulsarClientException e) {
+                // Just ignore the exception here.
+            }
         }
     }
 
@@ -330,9 +333,10 @@ public class PulsarRuntimeOperator implements Closeable {
      * message from this topic.
      */
     public <T> Message<T> receiveMessage(String topic, Schema<T> schema) {
-        try {
-            Consumer<T> consumer = createConsumer(topic, schema);
-            return drainOneMessage(consumer);
+        try (Consumer<T> consumer = createConsumer(topic, schema)) {
+            Message<T> message = consumer.receive();
+            consumer.acknowledge(message.getMessageId());
+            return message;
         } catch (PulsarClientException e) {
             sneakyThrow(e);
             return null;
@@ -344,10 +348,10 @@ public class PulsarRuntimeOperator implements Closeable {
      * timeout. A null message would be returned if no message has been consumed from Pulsar.
      */
     public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) {
-        try {
-            Consumer<T> consumer = createConsumer(topic, schema);
-            Message<T> message = consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS);
-            consumer.acknowledgeCumulative(message.getMessageId());
+        try (Consumer<T> consumer = createConsumer(topic, schema)) {
+            Message<T> message =
+                    consumer.receive(Math.toIntExact(timeout.toMillis()), MILLISECONDS);
+            consumer.acknowledge(message.getMessageId());
 
             return message;
         } catch (Exception e) {
@@ -371,12 +375,12 @@ public class PulsarRuntimeOperator implements Closeable {
             return singletonList(message);
         } else {
             // Drain a fixed number of messages.
-            try {
-                Consumer<T> consumer = createConsumer(topic, schema);
+            try (Consumer<T> consumer = createConsumer(topic, schema)) {
                 List<Message<T>> messages = new ArrayList<>(counts);
                 for (int i = 0; i < counts; i++) {
-                    Message<T> message = drainOneMessage(consumer);
+                    Message<T> message = consumer.receive();
                     messages.add(message);
+                    consumer.acknowledge(message.getMessageId());
                 }
                 return messages;
             } catch (PulsarClientException e) {
@@ -459,9 +463,6 @@ public class PulsarRuntimeOperator implements Closeable {
     /** This method is used for test framework. You can't close this operator manually. */
     @Override
     public void close() throws IOException {
-        producers.clear();
-        consumers.clear();
-
         if (admin != null) {
             admin.close();
         }
@@ -474,93 +475,53 @@ public class PulsarRuntimeOperator implements Closeable {
 
     private void createNonPartitionedTopic(String topic) {
         try {
-            admin().lookups().lookupTopic(topic);
-            sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0));
+            admin().topics().createNonPartitionedTopic(topic);
         } catch (PulsarAdminException e) {
-            sneakyAdmin(() -> admin().topics().createNonPartitionedTopic(topic));
+            if (!(e instanceof ConflictException
+                    && e.getMessage().equals("This topic already exists"))) {
+                sneakyThrow(e);
+            }
         }
     }
 
     private void createPartitionedTopic(String topic, int numberOfPartitions) {
         try {
-            admin().lookups().lookupPartitionedTopic(topic);
-            sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0));
+            admin().topics().createPartitionedTopic(topic, numberOfPartitions);
         } catch (PulsarAdminException e) {
-            sneakyAdmin(() -> admin().topics().createPartitionedTopic(topic, numberOfPartitions));
+            if (!(e instanceof ConflictException
+                    && e.getMessage().equals("This topic already exists"))) {
+                sneakyThrow(e);
+            }
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private <T> Producer<T> createProducer(String topic, Schema<T> schema)
-            throws PulsarClientException {
-        TopicName topicName = TopicName.get(topic);
-        String name = topicName.getPartitionedTopicName();
-        int index = topicName.getPartitionIndex();
-        ConcurrentHashMap<Integer, Producer<?>> topicProducers =
-                producers.computeIfAbsent(name, d -> new ConcurrentHashMap<>());
-
-        return (Producer<T>)
-                topicProducers.computeIfAbsent(
-                        index,
-                        i -> {
-                            ProducerBuilder<T> builder =
-                                    client().newProducer(schema)
-                                            .topic(topic)
-                                            .enableBatching(false)
-                                            .enableMultiSchema(true);
-
-                            return sneakyClient(builder::create);
-                        });
-    }
+    private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) {
+        ProducerBuilder<T> builder =
+                client().newProducer(schema)
+                        .topic(topic)
+                        .enableBatching(false)
+                        .enableMultiSchema(true)
+                        .accessMode(Shared);
 
-    @SuppressWarnings("unchecked")
-    private <T> Consumer<T> createConsumer(String topic, Schema<T> schema)
-            throws PulsarClientException {
-        TopicName topicName = TopicName.get(topic);
-        String name = topicName.getPartitionedTopicName();
-        int index = topicName.getPartitionIndex();
-        ConcurrentHashMap<Integer, Consumer<?>> topicConsumers =
-                consumers.computeIfAbsent(name, d -> new ConcurrentHashMap<>());
-
-        return (Consumer<T>)
-                topicConsumers.computeIfAbsent(
-                        index,
-                        i -> {
-                            ConsumerBuilder<T> builder =
-                                    client().newConsumer(schema)
-                                            .topic(topic)
-                                            .subscriptionName(SUBSCRIPTION_NAME)
-                                            .subscriptionMode(Durable)
-                                            .subscriptionType(Exclusive)
-                                            .subscriptionInitialPosition(Earliest);
-
-                            return sneakyClient(builder::subscribe);
-                        });
+        return sneakyClient(builder::create);
     }
 
-    private void removeProducers(String topic) {
-        String topicName = topicName(topic);
-        ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
-        if (integerProducers != null) {
-            for (Producer<?> producer : integerProducers.values()) {
-                sneakyClient(producer::close);
-            }
+    private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
+        // Create the earliest subscription if it's not existed.
+        List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic));
+        if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
+            sneakyAdmin(
+                    () -> admin().topics().createSubscription(topic, SUBSCRIPTION_NAME, earliest));
         }
-    }
 
-    private void removeConsumers(String topic) {
-        String topicName = topicName(topic);
-        ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
-        if (integerConsumers != null) {
-            for (Consumer<?> consumer : integerConsumers.values()) {
-                sneakyClient(consumer::close);
-            }
-        }
-    }
+        // Create the consumer without the initial position.
+        ConsumerBuilder<T> builder =
+                client().newConsumer(schema)
+                        .topic(topic)
+                        .subscriptionName(SUBSCRIPTION_NAME)
+                        .subscriptionMode(Durable)
+                        .subscriptionType(Exclusive);
 
-    private <T> Message<T> drainOneMessage(Consumer<T> consumer) throws PulsarClientException {
-        Message<T> message = consumer.receive();
-        consumer.acknowledgeCumulative(message.getMessageId());
-        return message;
+        return sneakyClient(builder::subscribe);
     }
 }