You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/14 06:02:15 UTC
[flink] 02/06: [FLINK-27388][Connector/pulsar] Change the topic setup logic in Pulsar runtime operator.
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e09a691dbd67457149558783a4597aa4c4821e99
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Mon Sep 5 23:51:10 2022 +0800
[FLINK-27388][Connector/pulsar] Change the topic setup logic in Pulsar runtime operator.
---
.../testutils/runtime/PulsarRuntimeOperator.java | 155 ++++++++-------------
1 file changed, 58 insertions(+), 97 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index be387c583b1..1fe98128e3d 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -53,7 +54,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -76,7 +76,8 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WR
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
+import static org.apache.pulsar.client.api.MessageId.earliest;
+import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
@@ -95,8 +96,6 @@ public class PulsarRuntimeOperator implements Closeable {
private final String adminUrl;
private final PulsarClient client;
private final PulsarAdmin admin;
- private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Producer<?>>> producers;
- private final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Consumer<?>>> consumers;
public PulsarRuntimeOperator(String serviceUrl, String adminUrl) {
this(serviceUrl, serviceUrl, adminUrl, adminUrl);
@@ -117,8 +116,6 @@ public class PulsarRuntimeOperator implements Closeable {
.enableTransaction(true)
.build());
this.admin = sneakyClient(() -> PulsarAdmin.builder().serviceHttpUrl(adminUrl).build());
- this.producers = new ConcurrentHashMap<>();
- this.consumers = new ConcurrentHashMap<>();
}
/**
@@ -169,7 +166,8 @@ public class PulsarRuntimeOperator implements Closeable {
}
/**
- * Create a pulsar topic with given partition number.
+ * Create a pulsar topic with given partition number if the topic doesn't exist. We won't do
+ * anything for the existing topic. Make sure correctly used in the testing code.
*
* @param topic The name of the topic.
* @param numberOfPartitions The number of partitions. We would create a non-partitioned topic
@@ -219,10 +217,6 @@ public class PulsarRuntimeOperator implements Closeable {
return;
}
- // Close all the available consumers and producers.
- removeConsumers(topic);
- removeProducers(topic);
-
if (metadata.partitions == NON_PARTITIONED) {
sneakyAdmin(() -> admin().topics().delete(topicName));
} else {
@@ -305,8 +299,8 @@ public class PulsarRuntimeOperator implements Closeable {
*/
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, String key, Collection<T> messages) {
+ Producer<T> producer = createProducer(topic, schema);
try {
- Producer<T> producer = createProducer(topic, schema);
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (T message : messages) {
@@ -322,6 +316,15 @@ public class PulsarRuntimeOperator implements Closeable {
} catch (PulsarClientException e) {
sneakyThrow(e);
return emptyList();
+ } finally {
+ try {
+ // Waiting for all the pending messages be sent to the Pulsar.
+ producer.flush();
+ // Directly close without the flush will drop all the pending messages.
+ producer.close();
+ } catch (PulsarClientException e) {
+ // Just ignore the exception here.
+ }
}
}
@@ -330,9 +333,10 @@ public class PulsarRuntimeOperator implements Closeable {
* message from this topic.
*/
public <T> Message<T> receiveMessage(String topic, Schema<T> schema) {
- try {
- Consumer<T> consumer = createConsumer(topic, schema);
- return drainOneMessage(consumer);
+ try (Consumer<T> consumer = createConsumer(topic, schema)) {
+ Message<T> message = consumer.receive();
+ consumer.acknowledge(message.getMessageId());
+ return message;
} catch (PulsarClientException e) {
sneakyThrow(e);
return null;
@@ -344,10 +348,10 @@ public class PulsarRuntimeOperator implements Closeable {
* timeout. A null message would be returned if no message has been consumed from Pulsar.
*/
public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) {
- try {
- Consumer<T> consumer = createConsumer(topic, schema);
- Message<T> message = consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS);
- consumer.acknowledgeCumulative(message.getMessageId());
+ try (Consumer<T> consumer = createConsumer(topic, schema)) {
+ Message<T> message =
+ consumer.receive(Math.toIntExact(timeout.toMillis()), MILLISECONDS);
+ consumer.acknowledge(message.getMessageId());
return message;
} catch (Exception e) {
@@ -371,12 +375,12 @@ public class PulsarRuntimeOperator implements Closeable {
return singletonList(message);
} else {
// Drain a fixed number of messages.
- try {
- Consumer<T> consumer = createConsumer(topic, schema);
+ try (Consumer<T> consumer = createConsumer(topic, schema)) {
List<Message<T>> messages = new ArrayList<>(counts);
for (int i = 0; i < counts; i++) {
- Message<T> message = drainOneMessage(consumer);
+ Message<T> message = consumer.receive();
messages.add(message);
+ consumer.acknowledge(message.getMessageId());
}
return messages;
} catch (PulsarClientException e) {
@@ -459,9 +463,6 @@ public class PulsarRuntimeOperator implements Closeable {
/** This method is used for test framework. You can't close this operator manually. */
@Override
public void close() throws IOException {
- producers.clear();
- consumers.clear();
-
if (admin != null) {
admin.close();
}
@@ -474,93 +475,53 @@ public class PulsarRuntimeOperator implements Closeable {
private void createNonPartitionedTopic(String topic) {
try {
- admin().lookups().lookupTopic(topic);
- sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0));
+ admin().topics().createNonPartitionedTopic(topic);
} catch (PulsarAdminException e) {
- sneakyAdmin(() -> admin().topics().createNonPartitionedTopic(topic));
+ if (!(e instanceof ConflictException
+ && e.getMessage().equals("This topic already exists"))) {
+ sneakyThrow(e);
+ }
}
}
private void createPartitionedTopic(String topic, int numberOfPartitions) {
try {
- admin().lookups().lookupPartitionedTopic(topic);
- sneakyAdmin(() -> admin().topics().expireMessagesForAllSubscriptions(topic, 0));
+ admin().topics().createPartitionedTopic(topic, numberOfPartitions);
} catch (PulsarAdminException e) {
- sneakyAdmin(() -> admin().topics().createPartitionedTopic(topic, numberOfPartitions));
+ if (!(e instanceof ConflictException
+ && e.getMessage().equals("This topic already exists"))) {
+ sneakyThrow(e);
+ }
}
}
- @SuppressWarnings("unchecked")
- private <T> Producer<T> createProducer(String topic, Schema<T> schema)
- throws PulsarClientException {
- TopicName topicName = TopicName.get(topic);
- String name = topicName.getPartitionedTopicName();
- int index = topicName.getPartitionIndex();
- ConcurrentHashMap<Integer, Producer<?>> topicProducers =
- producers.computeIfAbsent(name, d -> new ConcurrentHashMap<>());
-
- return (Producer<T>)
- topicProducers.computeIfAbsent(
- index,
- i -> {
- ProducerBuilder<T> builder =
- client().newProducer(schema)
- .topic(topic)
- .enableBatching(false)
- .enableMultiSchema(true);
-
- return sneakyClient(builder::create);
- });
- }
+ private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) {
+ ProducerBuilder<T> builder =
+ client().newProducer(schema)
+ .topic(topic)
+ .enableBatching(false)
+ .enableMultiSchema(true)
+ .accessMode(Shared);
- @SuppressWarnings("unchecked")
- private <T> Consumer<T> createConsumer(String topic, Schema<T> schema)
- throws PulsarClientException {
- TopicName topicName = TopicName.get(topic);
- String name = topicName.getPartitionedTopicName();
- int index = topicName.getPartitionIndex();
- ConcurrentHashMap<Integer, Consumer<?>> topicConsumers =
- consumers.computeIfAbsent(name, d -> new ConcurrentHashMap<>());
-
- return (Consumer<T>)
- topicConsumers.computeIfAbsent(
- index,
- i -> {
- ConsumerBuilder<T> builder =
- client().newConsumer(schema)
- .topic(topic)
- .subscriptionName(SUBSCRIPTION_NAME)
- .subscriptionMode(Durable)
- .subscriptionType(Exclusive)
- .subscriptionInitialPosition(Earliest);
-
- return sneakyClient(builder::subscribe);
- });
+ return sneakyClient(builder::create);
}
- private void removeProducers(String topic) {
- String topicName = topicName(topic);
- ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
- if (integerProducers != null) {
- for (Producer<?> producer : integerProducers.values()) {
- sneakyClient(producer::close);
- }
+ private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
+ // Create the earliest subscription if it's not existed.
+ List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic));
+ if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
+ sneakyAdmin(
+ () -> admin().topics().createSubscription(topic, SUBSCRIPTION_NAME, earliest));
}
- }
- private void removeConsumers(String topic) {
- String topicName = topicName(topic);
- ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
- if (integerConsumers != null) {
- for (Consumer<?> consumer : integerConsumers.values()) {
- sneakyClient(consumer::close);
- }
- }
- }
+ // Create the consumer without the initial position.
+ ConsumerBuilder<T> builder =
+ client().newConsumer(schema)
+ .topic(topic)
+ .subscriptionName(SUBSCRIPTION_NAME)
+ .subscriptionMode(Durable)
+ .subscriptionType(Exclusive);
- private <T> Message<T> drainOneMessage(Consumer<T> consumer) throws PulsarClientException {
- Message<T> message = consumer.receive();
- consumer.acknowledgeCumulative(message.getMessageId());
- return message;
+ return sneakyClient(builder::subscribe);
}
}