You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/27 08:15:12 UTC

[GitHub] [pulsar] equanz commented on a change in pull request #10279: [PIP 79][client] Add lazy-loading feature to PartitionedProducer

equanz commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r620971709



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
##########
@@ -1521,6 +1522,17 @@ public void testMaxTopicsPerNamespace() throws Exception {
         }
 
         // check producer/consumer auto create partitioned topic
+        final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {

Review comment:
       > Why are you modifying an existing test?
   
   Because this feature is modifying existing behavior. Currently, a partitioned producer connects to all partitions. Some cases of tests suppose "partitioned producer connects to all partitions(and also create all partition automatically)".
   In this change, I modified this procedure to lazy-loading and modified test cases to follow it([here](https://github.com/apache/pulsar/pull/10279/files#diff-5277af06358fe0abda3ede298643c934416cbfec27ab0e77d4c924320cd08e8b) is a part of test codes for this feature).
   
   If we should not change existing behavior, I'll change this feature to be able to stop.
   

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: " + partition);
+
+        if (!producers.containsKey(partition)) {
+            final State createState = createProducer(partition).handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition).close();

Review comment:
       The  reason for it is to recreate internal producer when producer creation is failed. This behavior is tested [here](https://github.com/apache/pulsar/pull/10279/files#diff-eee943fd1468c4cbe8922b01a487f153480b9ccbaa9606f114bb0bb373ba5859R493-R546).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org