You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/20 08:30:47 UTC

[GitHub] [pulsar] equanz opened a new pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

equanz opened a new pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279


   Master Issue: https://github.com/apache/pulsar/wiki/PIP-79%3A-Reduce-redundant-producers-from-partitioned-producer
   
   ### Motivation
   
   Please see the PIP document.
   This is a part of implementations. I will submit the next PR about PartitionedTopicStats later.
   
   ### Modifications
   
   * Partitioned producer connects to partitions lazily
     - only support it at Shared mode to support [this specification](https://github.com/apache/pulsar/blob/715e588d5293f8053f1a6ed6447baf9a31a3cae9/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java)
   * Add round-robin with limit number of partitions routing mode
   
   ### Verifying this change
   
   * [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   * Added test for lazy-loading
   * Added test for routing mode class
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   * Does this pull request introduce a new feature? (yes)
   * If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r619978186



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -1521,6 +1522,17 @@ public void testMaxTopicsPerNamespace() throws Exception {
         }
 
         // check producer/consumer auto create partitioned topic
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {

Review comment:
       Why are you modifying an existing test? 
   It is okay to refactor and reuse an existing test but in this case probably you are altering the behaviour of the existing guest and also you are not sure that you are testing your feature properly 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (!producers.containsKey(partition)) {
+            final State createState = createProducer(partition).handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition).close();

Review comment:
       We are not sure that we are removing the newly create producer here.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-913338113


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-942052839


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-935452803


   @eolivelli Could you please take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-930302846


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r728678301



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.customroute;
+
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.TopicMetadata;
+
+public class PartialRoundRobinMessageRouterImpl implements MessageRouter {
+    private final int numPartitionsLimit;
+    private final List<Integer> partialList = new CopyOnWriteArrayList<>();
+    private static final AtomicIntegerFieldUpdater<PartialRoundRobinMessageRouterImpl> PARTITION_INDEX_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PartialRoundRobinMessageRouterImpl.class, "partitionIndex");
+    @SuppressWarnings("unused")
+    private volatile int partitionIndex = 0;
+
+    public PartialRoundRobinMessageRouterImpl(final int numPartitionsLimit) {
+        this.numPartitionsLimit = numPartitionsLimit;

Review comment:
       It seems better to throw an exception if a value less than or equal to 0 is passed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r709995487



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,97 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.AuthorizationException);
+        }
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnSending() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeCounter = new AtomicInteger(0);
+        final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 2) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+            ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+        );
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+            closeCounter.incrementAndGet();
+        });
+
+        final PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
+                .enableLazyStartPartitionedProducers(true)
+                .accessMode(ProducerAccessMode.Shared)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        try {
+            producer.send("msg".getBytes());
+            fail("Should have failed with an not connected exception");

Review comment:
       I don't understand why sending a message always fails, even though one producer should have succeeded in connecting. Why? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r710855728



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -308,39 +348,44 @@ String getHandlerName() {
                     future.complete(null);
                     return future;
                 } else if (oldPartitionNumber < currentPartitionNumber) {
-                    List<CompletableFuture<Producer<T>>> futureList = list
-                        .subList(oldPartitionNumber, currentPartitionNumber)
-                        .stream()
-                        .map(partitionName -> {
-                            int partitionIndex = TopicName.getPartitionIndex(partitionName);
-                            ProducerImpl<T> producer =
-                                new ProducerImpl<>(client,
-                                    partitionName, conf, new CompletableFuture<>(),
-                                    partitionIndex, schema, interceptors);
-                            producers.add(producer);
-                            return producer.producerCreatedFuture();
-                        }).collect(Collectors.toList());
-
-                    FutureUtil.waitForAll(futureList)
-                        .thenAccept(finalFuture -> {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] success create producers for extended partitions. old: {}, new: {}",
-                                    topic, oldPartitionNumber, currentPartitionNumber);
-                            }
-                            topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
-                            future.complete(null);
-                        })
-                        .exceptionally(ex -> {
-                            // error happened, remove
-                            log.warn("[{}] fail create producers for extended partitions. old: {}, new: {}",
-                                topic, oldPartitionNumber, currentPartitionNumber);
-                            List<ProducerImpl<T>> sublist = producers.subList(oldPartitionNumber, producers.size());
-                            sublist.forEach(newProducer -> newProducer.closeAsync());
-                            sublist.clear();
-                            future.completeExceptionally(ex);
-                            return null;
-                        });
-                    return null;
+                    if (conf.isLazyStartPartitionedProducers() && conf.getAccessMode() == ProducerAccessMode.Shared) {
+                        topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
+                        future.complete(null);
+                        return future;
+                    } else {
+                        List<CompletableFuture<Producer<T>>> futureList = list
+                                .subList(oldPartitionNumber, currentPartitionNumber)
+                                .stream()
+                                .map(partitionName -> {
+                                    int partitionIndex = TopicName.getPartitionIndex(partitionName);
+                                    ProducerImpl<T> producer =
+                                            new ProducerImpl<>(client,
+                                                    partitionName, conf, new CompletableFuture<>(),
+                                                    partitionIndex, schema, interceptors);
+                                    producers.put(partitionIndex, producer);

Review comment:
       I've replaced to use `computeIfAbsent`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-912437211


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-941020433


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-823135859


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r700932705



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
##########
@@ -739,10 +740,14 @@ public void testConcurrentlyDeleteSchema() throws Exception {
         admin.namespaces().createNamespace("prop/ns-delete-schema", 3);
         admin.topics().createPartitionedTopic(topic, partitions);
 
-        Producer producer = pulsarClient
+        Producer<Schemas.BytesRecord> producer = pulsarClient
                 .newProducer(Schema.JSON(Schemas.BytesRecord.class))
                 .topic(topic)
+                .enableBatching(false)

Review comment:
       In this PR, I'll introduce internal producer lazy-loading feature. When internal producer is elected by MessageRouter and isn't created yet, create it lazily.
   https://github.com/equanz/pulsar/blob/844cca7cf84b76b7d27f12de4dbf36e23cbf29e9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L193-L211
   Therefore, if it isn't elected by MessageRouter, doesn't create producer and also topic in this test case.
   
   For above reason, we should add `Producer#send`.
   
   When enableBatching, not easier to elect all of internal producers at client side.
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java#L82-L84
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-908207378


   Rebased to current master commit
   PTAL
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r718160863



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.customroute;
+
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.TopicMetadata;
+
+public class PartialRoundRobinMessageRouterImpl implements MessageRouter {
+    private final int numPartitionsLimit;
+    private final List<Integer> partialList = new CopyOnWriteArrayList<>();
+    private static final AtomicIntegerFieldUpdater<PartialRoundRobinMessageRouterImpl> PARTITION_INDEX_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PartialRoundRobinMessageRouterImpl.class, "partitionIndex");
+    @SuppressWarnings("unused")
+    private volatile int partitionIndex = 0;
+
+    public PartialRoundRobinMessageRouterImpl(final int numPartitionsLimit) {
+        this.numPartitionsLimit = numPartitionsLimit;
+    }
+
+    /**
+     * Choose a partition based on the topic metadata.
+     * Key hash routing isn't supported.
+     *
+     * @param msg message
+     * @param metadata topic metadata
+     * @return the partition to route the message.
+     */
+    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+        return getOrCreatePartialList(metadata)
+                .get(signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), partialList.size()));
+    }
+
+    private List<Integer> getOrCreatePartialList(TopicMetadata metadata) {
+        if (partialList.isEmpty()) {
+            partialList.addAll(IntStream.range(0, metadata.numPartitions())

Review comment:
       Here we are kot handling concurrency correctly.
   Two threads may enter this branch 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -68,15 +72,25 @@
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
         super(client, topic, conf, producerCreatedFuture, schema, interceptors);
-        this.producers = Lists.newArrayListWithCapacity(numPartitions);
+        this.producers = new ConcurrentOpenHashMap<>();
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
 
+        // MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly
         int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
                 conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
         conf.setMaxPendingMessages(maxPendingMessages);
-        start();
+
+        final List<Integer> indexList = conf.isLazyStartPartitionedProducers() &&
+                                            conf.getAccessMode() == ProducerAccessMode.Shared ?

Review comment:
       Can wr convert this expression in a if/else block?
   Just to ease reading

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -159,7 +170,14 @@ private void start() {
                 return null;
             });
         }
+    }
 
+    private ProducerImpl<T> createProducer(final int partitionIndex) {
+        return producers.computeIfAbsent(partitionIndex, (idx) -> {
+            String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();

Review comment:
       Use idx and not partitionIndex in order to create a non capturing lambda

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,98 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.AuthorizationException);
+        }
+

Review comment:
       Please assert that the counter is 1, in order to verify that the mock took effect




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-913283258


   @eolivelli 
   > We should add a configuration option to enable this behavior explicitly
   
   I understand. I'll change the behavior to follow your comment (add configuration and disable it by default in this PR). However, I think this configuration is not important for typical user. So, if this feature is stable in the future version, I'd like to enable it by default.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r620971771



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (!producers.containsKey(partition)) {
+            final State createState = createProducer(partition).handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition).close();

Review comment:
       The  reason for it is to recreate internal producer when producer creation is failed. This behavior is tested [here](https://github.com/apache/pulsar/blob/d49aab48119cb31ab7935abbd38f117ef89cdfc7/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java#L493-L546).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r710002101



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,97 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.AuthorizationException);
+        }
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnSending() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeCounter = new AtomicInteger(0);
+        final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 2) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+            ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+        );
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+            closeCounter.incrementAndGet();
+        });
+
+        final PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
+                .enableLazyStartPartitionedProducers(true)
+                .accessMode(ProducerAccessMode.Shared)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        try {
+            producer.send("msg".getBytes());
+            fail("Should have failed with an not connected exception");

Review comment:
       Ah, is it because when the routing mode is `RoundRobinPartition`, a partition different from the one when the producer was created is always selected when sending the first message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-910207550


   @eolivelli @Vanlightly 
   Could you please review it?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r709974383



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -159,7 +170,14 @@ private void start() {
                 return null;
             });
         }
+    }
 
+    private ProducerImpl<T> createProducer(final int partitionIndex) {
+        String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();
+        ProducerImpl<T> producer = client.newProducerImpl(partitionName, partitionIndex,
+                conf, schema, interceptors, new CompletableFuture<>());
+        producers.put(partitionIndex, producer);

Review comment:
       Isn't it necessary to add an entry to `producers` using `computeIfAbsent`? I'm wondering if there will be a race condition with multiple threads.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-930302846


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r728751188



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PartialPartitionedProducerTest extends ProducerConsumerBase {
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPtWithSinglePartition() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-with-single-routing");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producerImpl.newMessage().value("msg".getBytes()).send();
+        }
+        assertEquals(producerImpl.getProducers().size(), 1);
+    }
+
+    @Test
+    public void testPtWithPartialPartition() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-with-partial-routing");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producerImpl.newMessage().value("msg".getBytes()).send();
+        }
+        assertEquals(producerImpl.getProducers().size(), 3);
+    }
+
+    // AddPartitionTest
+    @Test
+    public void testPtLazyLoading() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        final Supplier<Boolean> send = () -> {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    producerImpl.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        // create first producer at initialization step
+        assertEquals(producerImpl.getProducers().size(), 1);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 10);
+    }
+
+    @Test
+    public void testPtLoadingNotSharedMode() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-not-shared-mode");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImplExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        // create first producer at initialization step
+        assertEquals(producerImplExclusive.getProducers().size(), 10);
+
+        producerImplExclusive.close();
+
+        final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 10);
+    }
+
+    // AddPartitionAndLimitTest
+    @Test
+    public void testPtUpdateWithPartialPartition() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+        field.setAccessible(true);
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+                .accessMode(ProducerAccessMode.Shared)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        final Supplier<Boolean> send = ()-> {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    producerImpl.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        // create first producer at initialization step
+        assertEquals(producerImpl.getProducers().size(), 1);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 2);
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImpl)).numPartitions(), 3));
+        assertEquals(producerImpl.getProducers().size(), 2);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 3);
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImpl)).numPartitions(), 4));
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 3);
+    }
+
+    @Test
+    public void testPtUpdateNotSharedMode() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-update-not-shared");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+        field.setAccessible(true);
+        final PartitionedProducerImpl<byte[]> producerImplExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(producerImplExclusive.getProducers().size(), 2);
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImplExclusive)).numPartitions(), 3));
+        assertEquals(producerImplExclusive.getProducers().size(), 3);
+
+        producerImplExclusive.close();
+
+        final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 3);
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImplWaitForExclusive)).numPartitions(), 4));
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 4);
+
+        producerImplWaitForExclusive.close();

Review comment:
       It's my mistake. I'll add `@Cleanup` annotation to producers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r700910200



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -113,25 +126,22 @@ private MessageRouter getMessageRouter() {
 
     @Override
     public String getProducerName() {
-        return producers.get(0).getProducerName();
+        return producers.get(firstPartitionIndex).getProducerName();

Review comment:
       > shouldn't be the same producerName for all the producers ?
   
   In my understanding, currently producerName is calculated for each partition if doesn't set it at ProducerBuilder.
   
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1110-L1111
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1364-L1366
   
   > why not computing the name in the constructor and cache it ?
   
   If needed, I'll add the behavior that compute and cache producerName to below. 
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L134-L160




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r728702722



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +191,32 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (conf.isLazyStartPartitionedProducers() && !producers.containsKey(partition)) {
+            final ProducerImpl<T> newProducer = createProducer(partition);
+            final State createState = newProducer.producerCreatedFuture().handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition, newProducer);
+                        newProducer.close();
+                    } catch (PulsarClientException e) {
+                        log.error("[{}] Could not close internal producer. partitionIndex: {}", topic, partition, e);
+                    }
+                    return State.Failed;
+                }
+                log.debug("[{}] Created internal producer. partitionIndex: {}", topic, partition);

Review comment:
       ```suggestion
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] Created internal producer. partitionIndex: {}", topic, partition);
                   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r718160863



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.customroute;
+
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.TopicMetadata;
+
+public class PartialRoundRobinMessageRouterImpl implements MessageRouter {
+    private final int numPartitionsLimit;
+    private final List<Integer> partialList = new CopyOnWriteArrayList<>();
+    private static final AtomicIntegerFieldUpdater<PartialRoundRobinMessageRouterImpl> PARTITION_INDEX_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PartialRoundRobinMessageRouterImpl.class, "partitionIndex");
+    @SuppressWarnings("unused")
+    private volatile int partitionIndex = 0;
+
+    public PartialRoundRobinMessageRouterImpl(final int numPartitionsLimit) {
+        this.numPartitionsLimit = numPartitionsLimit;
+    }
+
+    /**
+     * Choose a partition based on the topic metadata.
+     * Key hash routing isn't supported.
+     *
+     * @param msg message
+     * @param metadata topic metadata
+     * @return the partition to route the message.
+     */
+    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+        return getOrCreatePartialList(metadata)
+                .get(signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), partialList.size()));
+    }
+
+    private List<Integer> getOrCreatePartialList(TopicMetadata metadata) {
+        if (partialList.isEmpty()) {
+            partialList.addAll(IntStream.range(0, metadata.numPartitions())

Review comment:
       Here we are kot handling concurrency correctly.
   Two threads may enter this branch 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -68,15 +72,25 @@
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
         super(client, topic, conf, producerCreatedFuture, schema, interceptors);
-        this.producers = Lists.newArrayListWithCapacity(numPartitions);
+        this.producers = new ConcurrentOpenHashMap<>();
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
 
+        // MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly
         int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
                 conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
         conf.setMaxPendingMessages(maxPendingMessages);
-        start();
+
+        final List<Integer> indexList = conf.isLazyStartPartitionedProducers() &&
+                                            conf.getAccessMode() == ProducerAccessMode.Shared ?

Review comment:
       Can wr convert this expression in a if/else block?
   Just to ease reading

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -159,7 +170,14 @@ private void start() {
                 return null;
             });
         }
+    }
 
+    private ProducerImpl<T> createProducer(final int partitionIndex) {
+        return producers.computeIfAbsent(partitionIndex, (idx) -> {
+            String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();

Review comment:
       Use idx and not partitionIndex in order to create a non capturing lambda

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,98 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.AuthorizationException);
+        }
+

Review comment:
       Please assert that the counter is 1, in order to verify that the mock took effect




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nkurihar commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
nkurihar commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-880433146


   @sijie @eolivelli @merlimat 
   Could you please review this PR?
   
   Though there are conflicts, we want to confirm the motivation and solution make sense before resolving conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Vanlightly commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
Vanlightly commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-893465622


   I have submitted a PR for lazy producers in the C++ client (https://github.com/apache/pulsar/pull/11570), I did the work before seeing this PIP-79. The approach I took was to create the producers without starting the lookup and connection procedure. So the collection of producers is as big as the number of partitions, and changes when the number partitions change, but the lookup and connect procedure is only started on a producer's first message. It does not block in sendAsync but kicks off the procedure asynchronously allowing messages to be buffered until the producer is connected. In the C++ client, deadlocks were an issue so I avoided any blocking code.
   
   I don't know if that approach makes sense for the Java client as I have not studied how the Java client works in detail yet. But that seems to be the main difference between my C++ changes and this Java client change.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-908169782


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-921453246


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r700948961



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
##########
@@ -863,7 +863,7 @@ public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions()
                 .messageRouter(new MessageRouter() {
                     @Override
                     public int choosePartition(Message<?> msg, TopicMetadata metadata) {
-                        return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
+                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) % metadata.numPartitions() : 0;

Review comment:
       I'll add this behavior for authn/authz (please see also https://github.com/apache/pulsar/pull/11570#issuecomment-897463605 ). Elect first partition index by MessageRouter with blank message.
   https://github.com/equanz/pulsar/blob/844cca7cf84b76b7d27f12de4dbf36e23cbf29e9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L87-L88
   
    If we shouldn't change this test and behavior, 
   
   1. add pseudo message key like `xxx` to blank message
   2. handle exceptions in PartitionedProducerImpl
   3. select first partition index without MessageRouter (e.g. randomly select)
      - if message router elects partition from part of partitions like singlepartition routing mode, create redundant producer for this change

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
##########
@@ -863,7 +863,7 @@ public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions()
                 .messageRouter(new MessageRouter() {
                     @Override
                     public int choosePartition(Message<?> msg, TopicMetadata metadata) {
-                        return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
+                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) % metadata.numPartitions() : 0;

Review comment:
       I'll add this behavior for authn/authz (please see also https://github.com/apache/pulsar/pull/11570#issuecomment-897463605 ). Elect first partition index by MessageRouter with blank message.
   https://github.com/equanz/pulsar/blob/844cca7cf84b76b7d27f12de4dbf36e23cbf29e9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L87-L88
   
    If we shouldn't change this test and behavior, 
   
   1. add pseudo message key like `xxx` to blank message
   2. handle exceptions in PartitionedProducerImpl
   3. select first partition index without MessageRouter (e.g. randomly select)
      - if message router elects partition from part of partitions like singlepartition routing mode, create redundant producer for this change
   4. etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r620971709



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -1521,6 +1522,17 @@ public void testMaxTopicsPerNamespace() throws Exception {
         }
 
         // check producer/consumer auto create partitioned topic
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {

Review comment:
       > Why are you modifying an existing test?
   
   Because this feature is modifying existing behavior. Currently, a partitioned producer connects to all partitions. Some cases of tests suppose "partitioned producer connects to all partitions(and also create all partition automatically)".
   In this change, I modified this procedure to lazy-loading and modified test cases to follow it([here](https://github.com/apache/pulsar/pull/10279/files#diff-5277af06358fe0abda3ede298643c934416cbfec27ab0e77d4c924320cd08e8b) is a part of test codes for this feature).
   
   If we should not change existing behavior, I'll change this feature to be able to stop.
   

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (!producers.containsKey(partition)) {
+            final State createState = createProducer(partition).handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition).close();

Review comment:
       The  reason for it is to recreate internal producer when producer creation is failed. This behavior is tested [here](https://github.com/apache/pulsar/pull/10279/files#diff-eee943fd1468c4cbe8922b01a487f153480b9ccbaa9606f114bb0bb373ba5859R493-R546).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-941016346


   /pulsarbot run-failure-checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r700910200



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -113,25 +126,22 @@ private MessageRouter getMessageRouter() {
 
     @Override
     public String getProducerName() {
-        return producers.get(0).getProducerName();
+        return producers.get(firstPartitionIndex).getProducerName();

Review comment:
       > shouldn't be the same producerName for all the producers ?
   
   In my understanding, currently producerName is calculated for each partition if doesn't set it at ProducerBuilder.
   
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1110-L1111
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1364-L1366
   
   > why not computing the name in the constructor and cache it ?
   
   I'll add the behavior that compute and cache producerName to below. 
   https://github.com/apache/pulsar/blob/fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java#L134-L160




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r710671150



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,97 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.AuthorizationException);
+        }
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnSending() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeCounter = new AtomicInteger(0);
+        final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 2) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+            ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+        );
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+            closeCounter.incrementAndGet();
+        });
+
+        final PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
+                .enableLazyStartPartitionedProducers(true)
+                .accessMode(ProducerAccessMode.Shared)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        try {
+            producer.send("msg".getBytes());
+            fail("Should have failed with an not connected exception");

Review comment:
       > is it because when the routing mode is RoundRobinPartition, a partition different from the one when the producer was created is always selected when sending the first message?
   
   Yes. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-930707822


   Addressed above comments.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-915044681


   Addressed your comments. PTAL.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r710793214



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -308,39 +348,44 @@ String getHandlerName() {
                     future.complete(null);
                     return future;
                 } else if (oldPartitionNumber < currentPartitionNumber) {
-                    List<CompletableFuture<Producer<T>>> futureList = list
-                        .subList(oldPartitionNumber, currentPartitionNumber)
-                        .stream()
-                        .map(partitionName -> {
-                            int partitionIndex = TopicName.getPartitionIndex(partitionName);
-                            ProducerImpl<T> producer =
-                                new ProducerImpl<>(client,
-                                    partitionName, conf, new CompletableFuture<>(),
-                                    partitionIndex, schema, interceptors);
-                            producers.add(producer);
-                            return producer.producerCreatedFuture();
-                        }).collect(Collectors.toList());
-
-                    FutureUtil.waitForAll(futureList)
-                        .thenAccept(finalFuture -> {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] success create producers for extended partitions. old: {}, new: {}",
-                                    topic, oldPartitionNumber, currentPartitionNumber);
-                            }
-                            topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
-                            future.complete(null);
-                        })
-                        .exceptionally(ex -> {
-                            // error happened, remove
-                            log.warn("[{}] fail create producers for extended partitions. old: {}, new: {}",
-                                topic, oldPartitionNumber, currentPartitionNumber);
-                            List<ProducerImpl<T>> sublist = producers.subList(oldPartitionNumber, producers.size());
-                            sublist.forEach(newProducer -> newProducer.closeAsync());
-                            sublist.clear();
-                            future.completeExceptionally(ex);
-                            return null;
-                        });
-                    return null;
+                    if (conf.isLazyStartPartitionedProducers() && conf.getAccessMode() == ProducerAccessMode.Shared) {
+                        topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
+                        future.complete(null);
+                        return future;
+                    } else {
+                        List<CompletableFuture<Producer<T>>> futureList = list
+                                .subList(oldPartitionNumber, currentPartitionNumber)
+                                .stream()
+                                .map(partitionName -> {
+                                    int partitionIndex = TopicName.getPartitionIndex(partitionName);
+                                    ProducerImpl<T> producer =
+                                            new ProducerImpl<>(client,
+                                                    partitionName, conf, new CompletableFuture<>(),
+                                                    partitionIndex, schema, interceptors);
+                                    producers.put(partitionIndex, producer);

Review comment:
       Wouldn't this be replaced with `computeIfAbsent`? For now, there seems to be no conflict between here and other processes. But that may not be the case in the future. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-921521227


   @massakam 
   Thank you for your comments. I have addressed your comments.
   PTAL
   
   @eolivelli 
   Could you please take another look?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-942052839






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r620971709



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -1521,6 +1522,17 @@ public void testMaxTopicsPerNamespace() throws Exception {
         }
 
         // check producer/consumer auto create partitioned topic
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {

Review comment:
       > Why are you modifying an existing test?
   
   Because this feature is [modifying existing behavior](https://github.com/apache/pulsar/wiki/PIP-79%3A-Reduce-redundant-producers-from-partitioned-producer#compatibility-deprecation-and-migration-plan). Currently, a partitioned producer connects to all partitions. Some cases of tests suppose "partitioned producer connects to all partitions(and also create all partition automatically)".
   In this change, I modified this procedure to lazy-loading and modified test cases to follow it([here](https://github.com/apache/pulsar/pull/10279/files#diff-5277af06358fe0abda3ede298643c934416cbfec27ab0e77d4c924320cd08e8b) is a part of test codes for this feature).
   
   If we should not change existing behavior, I'll change this feature to be able to stop.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r619978186



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -1521,6 +1522,17 @@ public void testMaxTopicsPerNamespace() throws Exception {
         }
 
         // check producer/consumer auto create partitioned topic
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {

Review comment:
       Why are you modifying an existing test? 
   It is okay to refactor and reuse an existing test but in this case probably you are altering the behaviour of the existing guest and also you are not sure that you are testing your feature properly 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (!producers.containsKey(partition)) {
+            final State createState = createProducer(partition).handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition).close();

Review comment:
       We are not sure that we are removing the newly create producer here.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r698275971



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -113,25 +126,22 @@ private MessageRouter getMessageRouter() {
 
     @Override
     public String getProducerName() {
-        return producers.get(0).getProducerName();
+        return producers.get(firstPartitionIndex).getProducerName();

Review comment:
       This change breaks existing behavior. If it is not approved, I'll be changed to create the `partition-0` producer for backward compatibility.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r700189337



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
##########
@@ -57,8 +58,17 @@ public void testPartitionedTopicAutoCreation() throws PulsarAdminException, Puls
 
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
+                .enableBatching(false)
                 .create();
 
+        for (int i = 0; i < 3; i++) {
+            try {
+                producer.newMessage().value("msg".getBytes()).send();
+            } catch (Throwable e) {

Review comment:
       the test will fail anyway, no need to catch the exception

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -924,13 +936,25 @@ public void testRemoveMaxProducers() throws Exception {
         Integer maxProducers = 2;
         log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {
+            for (int i = 0; i < 2; i++) {
+                try {
+                    p.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {

Review comment:
       same here

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -113,25 +126,22 @@ private MessageRouter getMessageRouter() {
 
     @Override
     public String getProducerName() {
-        return producers.get(0).getProducerName();
+        return producers.get(firstPartitionIndex).getProducerName();

Review comment:
       shouldn't be the same producerName for all the producers ?
   why not computing the name in the constructor and cache it ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -418,7 +418,11 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
         long resetTimeInMillis = TimeUnit.SECONDS
                 .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
         admin.topics().createPartitionedTopic(topicName, partitions);
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();

Review comment:
       why do you need to change this test ? (`enableBatching(false)`) 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (!producers.containsKey(partition)) {
+            final State createState = createProducer(partition).handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition).close();

Review comment:
       I mean that we should test the result for "producers.remove(partition)" for equality ('==') with `prod`

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -894,15 +895,26 @@ public void testGetMaxProducerApplied() throws Exception {
     public void testSetMaxProducers() throws Exception {
         Integer maxProducers = 2;
         log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {
+            for (int i = 0; i < 2; i++) {
+                try {
+                    p.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    log.info("Exception: ", e);
+                    fail();

Review comment:
       what about 
   
   ```
   catch (Exception e) {
               log.info("Exception: ", e);
               throw new RuntimeException(e);
   }
   ```

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
##########
@@ -739,10 +740,14 @@ public void testConcurrentlyDeleteSchema() throws Exception {
         admin.namespaces().createNamespace("prop/ns-delete-schema", 3);
         admin.topics().createPartitionedTopic(topic, partitions);
 
-        Producer producer = pulsarClient
+        Producer<Schemas.BytesRecord> producer = pulsarClient
                 .newProducer(Schema.JSON(Schemas.BytesRecord.class))
                 .topic(topic)
+                .enableBatching(false)

Review comment:
       why do you need to change this test ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
##########
@@ -863,7 +863,7 @@ public void testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions()
                 .messageRouter(new MessageRouter() {
                     @Override
                     public int choosePartition(Message<?> msg, TopicMetadata metadata) {
-                        return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
+                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) % metadata.numPartitions() : 0;

Review comment:
       this change looks unrelated to this patch, please revert

##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
##########
@@ -182,7 +182,7 @@ public void testPublishCompactAndConsumePartitionedTopics(Supplier<String> servi
                 .messageRouter(new MessageRouter() {
                     @Override
                     public int choosePartition(Message<?> msg, TopicMetadata metadata) {
-                        return Integer.parseInt(msg.getKey()) % metadata.numPartitions();
+                        return msg.hasKey() ? Integer.parseInt(msg.getKey()) % metadata.numPartitions() : 0;

Review comment:
       why do you need to change this test ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r710675542



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
##########
@@ -462,8 +463,97 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException.AuthorizationException);
+        }
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnSending() throws Throwable {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeCounter = new AtomicInteger(0);
+        final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 2) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+        });
+
+        mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+            ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+        );
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+            closeCounter.incrementAndGet();
+        });
+
+        final PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
+                .enableLazyStartPartitionedProducers(true)
+                .accessMode(ProducerAccessMode.Shared)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        try {
+            producer.send("msg".getBytes());
+            fail("Should have failed with an not connected exception");

Review comment:
       I got it. I think it's better to explicitly set `RoundRobinPartition` as the routing mode here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] equanz commented on pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
equanz commented on pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#issuecomment-911509151


   @eolivelli 
   Thank you for your comments! I'll address your comments.
   And could you please check my replies?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r728672520



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PartialPartitionedProducerTest extends ProducerConsumerBase {
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPtWithSinglePartition() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-with-single-routing");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producerImpl.newMessage().value("msg".getBytes()).send();
+        }
+        assertEquals(producerImpl.getProducers().size(), 1);
+    }
+
+    @Test
+    public void testPtWithPartialPartition() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-with-partial-routing");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producerImpl.newMessage().value("msg".getBytes()).send();
+        }
+        assertEquals(producerImpl.getProducers().size(), 3);
+    }
+
+    // AddPartitionTest
+    @Test
+    public void testPtLazyLoading() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        final Supplier<Boolean> send = () -> {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    producerImpl.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        // create first producer at initialization step
+        assertEquals(producerImpl.getProducers().size(), 1);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 10);
+    }
+
+    @Test
+    public void testPtLoadingNotSharedMode() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-not-shared-mode");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        final PartitionedProducerImpl<byte[]> producerImplExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        // create first producer at initialization step
+        assertEquals(producerImplExclusive.getProducers().size(), 10);
+
+        producerImplExclusive.close();
+
+        final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 10);
+    }
+
+    // AddPartitionAndLimitTest
+    @Test
+    public void testPtUpdateWithPartialPartition() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+        field.setAccessible(true);
+        final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+                .accessMode(ProducerAccessMode.Shared)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        final Supplier<Boolean> send = ()-> {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    producerImpl.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        // create first producer at initialization step
+        assertEquals(producerImpl.getProducers().size(), 1);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 2);
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImpl)).numPartitions(), 3));
+        assertEquals(producerImpl.getProducers().size(), 2);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 3);
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImpl)).numPartitions(), 4));
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 3);
+    }
+
+    @Test
+    public void testPtUpdateNotSharedMode() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-update-not-shared");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+        field.setAccessible(true);
+        final PartitionedProducerImpl<byte[]> producerImplExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(producerImplExclusive.getProducers().size(), 2);
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImplExclusive)).numPartitions(), 3));
+        assertEquals(producerImplExclusive.getProducers().size(), 3);
+
+        producerImplExclusive.close();
+
+        final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 3);
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) field.get(producerImplWaitForExclusive)).numPartitions(), 4));
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 4);
+
+        producerImplWaitForExclusive.close();

Review comment:
       In `PartialPartitionedProducerTest`, why do some tests close the used producers and some do not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org