You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 03:26:18 UTC

[pulsar] branch branch-2.6 updated: support websocket producer for v2 topic (#8535)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 81d5caa  support websocket producer for v2 topic (#8535)
81d5caa is described below

commit 81d5caa8d939a29d3451766baa434fc06d5b5360
Author: k2la <mz...@gmail.com>
AuthorDate: Fri Nov 13 17:11:25 2020 +0900

    support websocket producer for v2 topic (#8535)
    
    ### Motivation
    - Not support v2 topics.
    - Not working because `/` is missing between `"ws/producer" and topicName`
    https://github.com/apache/pulsar/blob/1c7b12d379cf36183d60919dcaff75e4d6cf63f2/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java#L169
    
    ### Modifications
    - Fix that if use V2 topics, use "ws/v2/producer/".
    
    (cherry picked from commit 67f544cdaa34390016c52534886573e8ffad0177)
---
 .../org/apache/pulsar/proxy/socket/client/PerformanceClient.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 531918e..107791b 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -36,9 +36,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,7 +168,9 @@ public class PerformanceClient {
             String topicName, String authPluginClassName, String authParams) throws InterruptedException, FileNotFoundException {
         ExecutorService executor = Executors.newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-producer-exec"));
         HashMap<String, Tuple> producersMap = new HashMap<>();
-        String produceBaseEndPoint = baseUrl + "ws/producer" + topicName;
+        String restPath = TopicName.get(topicName).getRestPath();
+        String produceBaseEndPoint = TopicName.get(topicName).isV2() ?
+                baseUrl + "ws/v2/producer/" + restPath : baseUrl + "ws/producer/" + restPath;
         for (int i = 0; i < numOfTopic; i++) {
             String topic = numOfTopic > 1 ? produceBaseEndPoint + String.valueOf(i) : produceBaseEndPoint;
             URI produceUri = URI.create(topic);