You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/25 01:08:07 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request #11085: [WIP] Support socks5 proxy

Technoboy- opened a new pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085


   ### Motivation
   
   Some user networks may have a layer of socks5 proxy, so the client side is added socks5 proxy handler to support socks5 protocal.
   
   ### Modifications
   
   Add Socks5ProxyHandler in ConnectionPool as the first handler for producer and consumer.
   
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it **add** or upgrade a dependency): (yes, add)
     - The public API: (yes)
     - The schema: (no)
     - The default values of configurations: (yes)
     - The wire protocol: (no)
     - The rest endpoints: ( no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
   
   ### How to use
   
   ```java
         PulsarClient
                   .builder()
                   .serviceUrl("pulsar://service-url:6650")
                   .socks5ProxyAddress(new InetSocketAddress("localhost", 11080)) // change to proxy host and port
                   .socks5ProxyUsername("username")  // If there is a username and password
                   .socks5ProxyPassword("password")  
                   .build()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661206408



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");
+        try {
+            URI uri = URI.create(proxyAddress);
+            return new InetSocketAddress(uri.getHost(), uri.getPort());
+        } catch (Exception ignore) {
+            return null;
+        }
+    }
+
+    public String getSocks5ProxyUsername() {
+        return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername : System.getProperty("socks5Proxy.username");

Review comment:
       yes. the same with upper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-869288891


   @Technoboy- many thanks for your details!
   
   As we called just now, could you please help add docs and ping me review?
   
   You might need to:
   
   1. Check to which versions the doc will be added.
   - If you want to modify `master` doc, go [here](https://github.com/apache/pulsar/tree/master/site2/docs).
   - If you want to modify `versioned` docs, go [here](https://github.com/apache/pulsar/tree/master/site2/website/versioned_docs).
   
   2. Add docs (3 parameters and add necessary descipritons) [here](https://pulsar.apache.org/docs/en/next/client-libraries-java/#client)
   ![image](https://user-images.githubusercontent.com/50226895/123570190-4d15c400-d7fa-11eb-8b92-97f9e08f82f6.png)
   
   ---
   
   **Tip**
   
   [Java Client API website (doc)](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/) is automatically generated from [its code](https://github.com/apache/pulsar/tree/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api).
   
   If you add descriptions (like you did [here](https://github.com/apache/pulsar/pull/11085/files#diff-33219b2b08e58d5841feb88fdae064889f2477fd77564044ff4bb4e8fc24f1f9) ), you've already added docs for the Java API website. (@codelipenghui  so do not remove the ` component/documentation` label from this PR, I've labeled it back. Anyway many thanks for your label :-D)
   
   ![image](https://user-images.githubusercontent.com/50226895/123570724-68350380-d7fb-11eb-947d-cd75b20f61c9.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-870517587


   @Anonymitaet I noticed you have added the `component/documentation` tag, we are using this for indicating the PR is only the documentation change? or the PR has documentation change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661120415



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -105,6 +109,11 @@
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
+    // socks5
+    private InetSocketAddress socks5ProxyAddress;

Review comment:
       Could using the `String` type? We could generate `InetSocketAddress` in class `PulsarChannelInitializer.java ` when initializing the socks5 handler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r660528462



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {

Review comment:
       Looks the tests are not applied socks5 proxy? I noticed no one call this method.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        server.shutdown();
+    }
+
+    private void initData() throws PulsarAdminException {
+        admin.tenants().createTenant("public", new TenantInfo() {
+            @Override
+            public Set<String> getAdminRoles() {
+                return Collections.emptySet();
+            }
+
+            @Override
+            public Set<String> getAllowedClusters() {
+                Set<String> clusters = new HashSet<>();
+                clusters.add("test");
+                return clusters;
+            }
+        });
+        admin.namespaces().createNamespace("public/default");
+        admin.topics().createNonPartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testSendAndConsumer() throws PulsarClientException {
+        startSocks5Server(true);
+        // init consumer
+        final String subscriptionName = "socks5-subscription";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        //init producer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        String msg = "abc";
+        producer.send(msg.getBytes());
+        Message<byte[]> message = consumer.receive();
+
+        assertEquals(new String(message.getData()), msg);
+
+        consumer.unsubscribe();
+    }
+
+    @Test
+    public void testDisableAuth() throws PulsarClientException {
+        startSocks5Server(false);
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
+        PulsarClient pulsarClient = replacePulsarClient(clientBuilder);
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        String msg = "abc";
+        producer.send(msg.getBytes());
+    }
+
+    @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})

Review comment:
       Why we will get timeout exception here, because of the pulsar client auto reconnect?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-870517587


   @Anonymitaet I noticed you have added the `component/documentation` tag, we are using this for indicating the PR is only the documentation change? or the PR has documentation change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-871821462


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- removed a comment on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- removed a comment on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-871896395


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r660549507



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        server.shutdown();
+    }
+
+    private void initData() throws PulsarAdminException {
+        admin.tenants().createTenant("public", new TenantInfo() {
+            @Override
+            public Set<String> getAdminRoles() {
+                return Collections.emptySet();
+            }
+
+            @Override
+            public Set<String> getAllowedClusters() {
+                Set<String> clusters = new HashSet<>();
+                clusters.add("test");
+                return clusters;
+            }
+        });
+        admin.namespaces().createNamespace("public/default");
+        admin.topics().createNonPartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testSendAndConsumer() throws PulsarClientException {
+        startSocks5Server(true);
+        // init consumer
+        final String subscriptionName = "socks5-subscription";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        //init producer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        String msg = "abc";
+        producer.send(msg.getBytes());
+        Message<byte[]> message = consumer.receive();
+
+        assertEquals(new String(message.getData()), msg);
+
+        consumer.unsubscribe();
+    }
+
+    @Test
+    public void testDisableAuth() throws PulsarClientException {
+        startSocks5Server(false);
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
+        PulsarClient pulsarClient = replacePulsarClient(clientBuilder);
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        String msg = "abc";
+        producer.send(msg.getBytes());
+    }
+
+    @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})

Review comment:
       This test is for auth enabled with incorrect password.  
   If run this test method, pulsar client can't connect to socks5 proxy. The socks5 proxy handler will throw ProxyConnectException, but process result in Netty thread(It's the same behavior with ssl handler). Thus, this will lead pulsar client to reconnect. This is the right behavior, so in order to decrease the time wasted, we'd better stop this with timeout and ThreadTimeoutException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r659459449



##########
File path: site2/website/versioned_docs/version-2.8.1/client-libraries-java.md
##########
@@ -122,6 +122,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
+SocketAddress|`socks5ProxyAddress`|Socks5 proxy address | None

Review comment:
       If user config socks5ProxyAddress, then SOCKS5 proxy handler will be added as the first handler automatically.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661252713



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -105,6 +109,11 @@
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
+    // socks5
+    private InetSocketAddress socks5ProxyAddress;

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661253892



##########
File path: site2/docs/client-libraries-java.md
##########
@@ -121,7 +121,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
-
+SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-869288891


   @Technoboy- many thanks for your details!
   
   As we called just now, could you please help add docs and ping me review?
   
   You might need to:
   
   1. Check to which versions the doc will be added.
   - If you want to modify `master` doc, go [here](https://github.com/apache/pulsar/tree/master/site2/docs).
   - If you want to modify `versioned` docs, go [here](https://github.com/apache/pulsar/tree/master/site2/website/versioned_docs).
   
   2. Add docs (3 parameters and add necessary descipritons) [here](https://pulsar.apache.org/docs/en/next/client-libraries-java/#client)
   ![image](https://user-images.githubusercontent.com/50226895/123570190-4d15c400-d7fa-11eb-8b92-97f9e08f82f6.png)
   
   3. Besides, [this](https://docs.google.com/spreadsheets/d/1iTzn6QnOpTYK8QQHxXRb6a25nvuTNGPPJDdUQHGBUS4/edit#gid=1784579914) is the doc architecture for various clients, you can take it as a refernece. 
   
   ---
   
   **Tip**
   
   [Java Client API website (doc)](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/) is automatically generated from [its code](https://github.com/apache/pulsar/tree/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api).
   
   If you add descriptions (like you did [here](https://github.com/apache/pulsar/pull/11085/files#diff-33219b2b08e58d5841feb88fdae064889f2477fd77564044ff4bb4e8fc24f1f9) ), you've already added docs for the Java API website. (@codelipenghui  so do not remove the ` component/documentation` label from this PR, I've labeled it back. Anyway many thanks for your label :-D)
   
   ![image](https://user-images.githubusercontent.com/50226895/123570724-68350380-d7fb-11eb-947d-cd75b20f61c9.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661200748



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/socks5/handler/InitialRequestHandler.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.socks5.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.socksx.SocksVersion;
+import io.netty.handler.codec.socksx.v5.DefaultSocks5InitialRequest;
+import io.netty.handler.codec.socksx.v5.DefaultSocks5InitialResponse;
+import io.netty.handler.codec.socksx.v5.Socks5AuthMethod;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.socks5.config.Socks5Config;
+
+@Slf4j
+public class InitialRequestHandler extends SimpleChannelInboundHandler<DefaultSocks5InitialRequest> {
+
+	private final Socks5Config socks5Config;
+
+	public InitialRequestHandler(final Socks5Config socks5Config) {
+		this.socks5Config = socks5Config;
+	}
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, DefaultSocks5InitialRequest msg) throws Exception {
+		if (SocksVersion.SOCKS5.equals(msg.version())) {
+			if (msg.decoderResult().isFailure()) {
+				log.warn("decode failure : {}", msg.decoderResult());
+				ctx.fireChannelRead(msg);
+			} else {
+				if (SocksVersion.SOCKS5.equals(msg.version())) {

Review comment:
       yes, thanks. I will remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-871938006


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-871896395


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r660550604



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {

Review comment:
       The test method 'testSendAndConsumer' will use this to customize - pulsarClient. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r660528462



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {

Review comment:
       Looks the tests are not applied socks5 proxy? I noticed no one call this method.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        server.shutdown();
+    }
+
+    private void initData() throws PulsarAdminException {
+        admin.tenants().createTenant("public", new TenantInfo() {
+            @Override
+            public Set<String> getAdminRoles() {
+                return Collections.emptySet();
+            }
+
+            @Override
+            public Set<String> getAllowedClusters() {
+                Set<String> clusters = new HashSet<>();
+                clusters.add("test");
+                return clusters;
+            }
+        });
+        admin.namespaces().createNamespace("public/default");
+        admin.topics().createNonPartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testSendAndConsumer() throws PulsarClientException {
+        startSocks5Server(true);
+        // init consumer
+        final String subscriptionName = "socks5-subscription";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        //init producer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        String msg = "abc";
+        producer.send(msg.getBytes());
+        Message<byte[]> message = consumer.receive();
+
+        assertEquals(new String(message.getData()), msg);
+
+        consumer.unsubscribe();
+    }
+
+    @Test
+    public void testDisableAuth() throws PulsarClientException {
+        startSocks5Server(false);
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
+        PulsarClient pulsarClient = replacePulsarClient(clientBuilder);
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        String msg = "abc";
+        producer.send(msg.getBytes());
+    }
+
+    @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})

Review comment:
       Why we will get timeout exception here, because of the pulsar client auto reconnect?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661226868



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-869298237


   > @Technoboy- many thanks for your details!
   > 
   > As we called just now, could you please help add docs and ping me review?
   > 
   > You might need to:
   > 
   > 1. Check to which versions the doc will be added.
   > 
   > * If you want to modify `master` doc, go [here](https://github.com/apache/pulsar/tree/master/site2/docs).
   > * If you want to modify `versioned` docs, go [here](https://github.com/apache/pulsar/tree/master/site2/website/versioned_docs).
   > 
   > 1. Add docs (3 parameters and add necessary descipritons) [here](https://pulsar.apache.org/docs/en/next/client-libraries-java/#client)
   >    ![image](https://user-images.githubusercontent.com/50226895/123570190-4d15c400-d7fa-11eb-8b92-97f9e08f82f6.png)
   > 2. Besides, [this](https://docs.google.com/spreadsheets/d/1iTzn6QnOpTYK8QQHxXRb6a25nvuTNGPPJDdUQHGBUS4/edit#gid=1784579914) is the doc architecture for various clients, you can take it as a refernece.
   > 
   > **Tip**
   > 
   > [Java Client API website (doc)](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/) is automatically generated from [its code](https://github.com/apache/pulsar/tree/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api).
   > 
   > If you add descriptions (like you did [here](https://github.com/apache/pulsar/pull/11085/files#diff-33219b2b08e58d5841feb88fdae064889f2477fd77564044ff4bb4e8fc24f1f9) ), you've already added docs for the Java API website. (@codelipenghui so do not remove the ` component/documentation` label from this PR, I've labeled it back. Anyway many thanks for your label :-D)
   > 
   > ![image](https://user-images.githubusercontent.com/50226895/123570724-68350380-d7fb-11eb-947d-cd75b20f61c9.png)
   
   Done, please help check client-libraries-java.md .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661203380



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");

Review comment:
       Yes, this is not a common feature, @codelipenghui suggest to support from System property. But keep the clientBuilder together. We maybe remove the client builder api somedays.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r659457592



##########
File path: site2/website/versioned_docs/version-2.8.1/client-libraries-java.md
##########
@@ -122,6 +122,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
+SocketAddress|`socks5ProxyAddress`|Socks5 proxy address | None

Review comment:
       ```suggestion
   SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None
   ```
   
   Please pay attention to the capitalization of the proper noun: https://en.wikipedia.org/wiki/SOCKS
   
   Please correct all occurrences, thanks. 

##########
File path: site2/website/versioned_docs/version-2.8.1/client-libraries-java.md
##########
@@ -122,6 +122,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
+SocketAddress|`socks5ProxyAddress`|Socks5 proxy address | None

Review comment:
       Do we need to add descriptions as below?
   
   If you want to use the SOCKS5 proxy, you can add a SOCKS5 proxy handler in the connection pool as the first handler for both producer and consumer.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661112139



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
##########
@@ -294,9 +294,13 @@ void closeAllConnections() {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channel -> channelInitializerHandler
                             .initTls(channel, sniHost != null ? sniHost : remoteAddress))
+                    .thenCompose(channel -> channelInitializerHandler
+                            .initSocks5IfConfig(channel))

Review comment:
       Does the TLS could work well with socks5?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();

Review comment:
       Use log?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");

Review comment:
       Does it need to config client by the system property?

##########
File path: site2/docs/client-libraries-java.md
##########
@@ -121,7 +121,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
-
+SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None

Review comment:
       The type is `String`?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");
+        try {
+            URI uri = URI.create(proxyAddress);
+            return new InetSocketAddress(uri.getHost(), uri.getPort());
+        } catch (Exception ignore) {
+            return null;

Review comment:
       Why ignore this exception?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");
+        try {
+            URI uri = URI.create(proxyAddress);
+            return new InetSocketAddress(uri.getHost(), uri.getPort());
+        } catch (Exception ignore) {
+            return null;
+        }
+    }
+
+    public String getSocks5ProxyUsername() {
+        return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername : System.getProperty("socks5Proxy.username");
+    }
 
+    public String getSocks5ProxyPassword() {
+        return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword : System.getProperty("socks5Proxy.password");

Review comment:
       Same as above.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");
+        try {
+            URI uri = URI.create(proxyAddress);
+            return new InetSocketAddress(uri.getHost(), uri.getPort());
+        } catch (Exception ignore) {
+            return null;
+        }
+    }
+
+    public String getSocks5ProxyUsername() {
+        return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername : System.getProperty("socks5Proxy.username");

Review comment:
       Does it need to config client by the system property?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -105,6 +109,11 @@
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
+    // socks5
+    private InetSocketAddress socks5ProxyAddress;

Review comment:
       Could using the `String` type? We could generate the `InetSocketAddress` in the `PulsarChannelInitializer.java ` when initializing the socks5 handler.

##########
File path: site2/website/versioned_docs/version-2.8.1/client-libraries-java.md
##########
@@ -122,6 +122,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
+SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None

Review comment:
       The type is `String`?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        server.shutdown();
+    }
+
+    private void initData() throws PulsarAdminException {
+        admin.tenants().createTenant("public", new TenantInfo() {
+            @Override
+            public Set<String> getAdminRoles() {
+                return Collections.emptySet();
+            }
+
+            @Override
+            public Set<String> getAllowedClusters() {
+                Set<String> clusters = new HashSet<>();
+                clusters.add("test");
+                return clusters;
+            }
+        });
+        admin.namespaces().createNamespace("public/default");
+        admin.topics().createNonPartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testSendAndConsumer() throws PulsarClientException {
+        startSocks5Server(true);
+        // init consumer
+        final String subscriptionName = "socks5-subscription";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        //init producer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        String msg = "abc";
+        producer.send(msg.getBytes());
+        Message<byte[]> message = consumer.receive();
+
+        assertEquals(new String(message.getData()), msg);
+
+        consumer.unsubscribe();
+    }
+
+    @Test
+    public void testDisableAuth() throws PulsarClientException {
+        startSocks5Server(false);
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
+        PulsarClient pulsarClient = replacePulsarClient(clientBuilder);
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        String msg = "abc";
+        producer.send(msg.getBytes());
+    }
+
+    @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})

Review comment:
       Is there any other clear flag for password error?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/socks5/handler/InitialRequestHandler.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.socks5.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.socksx.SocksVersion;
+import io.netty.handler.codec.socksx.v5.DefaultSocks5InitialRequest;
+import io.netty.handler.codec.socksx.v5.DefaultSocks5InitialResponse;
+import io.netty.handler.codec.socksx.v5.Socks5AuthMethod;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.socks5.config.Socks5Config;
+
+@Slf4j
+public class InitialRequestHandler extends SimpleChannelInboundHandler<DefaultSocks5InitialRequest> {
+
+	private final Socks5Config socks5Config;
+
+	public InitialRequestHandler(final Socks5Config socks5Config) {
+		this.socks5Config = socks5Config;
+	}
+
+	@Override
+	protected void channelRead0(ChannelHandlerContext ctx, DefaultSocks5InitialRequest msg) throws Exception {
+		if (SocksVersion.SOCKS5.equals(msg.version())) {
+			if (msg.decoderResult().isFailure()) {
+				log.warn("decode failure : {}", msg.decoderResult());
+				ctx.fireChannelRead(msg);
+			} else {
+				if (SocksVersion.SOCKS5.equals(msg.version())) {

Review comment:
       It seems that this check is duplicated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661199930



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();

Review comment:
       yes, I will replace to log ,thanks for reviewing .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661207008



##########
File path: site2/docs/client-libraries-java.md
##########
@@ -121,7 +121,9 @@ int|`connectionTimeoutMs`|Duration of waiting for a connection to a broker to be
 int|`requestTimeoutMs`|Maximum duration for completing a request |60000
 int|`defaultBackoffIntervalNanos`| Default duration for a backoff interval | TimeUnit.MILLISECONDS.toNanos(100);
 long|`maxBackoffIntervalNanos`|Maximum duration for a backoff interval|TimeUnit.SECONDS.toNanos(30)
-
+SocketAddress|`socks5ProxyAddress`|SOCKS5 proxy address | None

Review comment:
       This is for ClientBuilder, so I think this should keep the same with ClientBuilder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661204474



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");
+        try {
+            URI uri = URI.create(proxyAddress);
+            return new InetSocketAddress(uri.getHost(), uri.getPort());
+        } catch (Exception ignore) {
+            return null;

Review comment:
       I will change to print a warning log instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661201433



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
##########
@@ -294,9 +294,13 @@ void closeAllConnections() {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channel -> channelInitializerHandler
                             .initTls(channel, sniHost != null ? sniHost : remoteAddress))
+                    .thenCompose(channel -> channelInitializerHandler
+                            .initSocks5IfConfig(channel))

Review comment:
       Yes, If config socks5 and TLS,  socks5 proxy will be the first handler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r660549507



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.socks5ProxyAddress(new InetSocketAddress("localhost", 11080))
+                .socks5ProxyUsername("socks5")
+                .socks5ProxyPassword("pulsar");
+    }
+
+    private void startSocks5Server(boolean enableAuth) {
+        Socks5Config config = new Socks5Config();
+        config.setPort(11080);
+        config.setEnableAuth(enableAuth);
+        server = new Socks5Server(config);
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    server.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        server.shutdown();
+    }
+
+    private void initData() throws PulsarAdminException {
+        admin.tenants().createTenant("public", new TenantInfo() {
+            @Override
+            public Set<String> getAdminRoles() {
+                return Collections.emptySet();
+            }
+
+            @Override
+            public Set<String> getAllowedClusters() {
+                Set<String> clusters = new HashSet<>();
+                clusters.add("test");
+                return clusters;
+            }
+        });
+        admin.namespaces().createNamespace("public/default");
+        admin.topics().createNonPartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testSendAndConsumer() throws PulsarClientException {
+        startSocks5Server(true);
+        // init consumer
+        final String subscriptionName = "socks5-subscription";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        //init producer
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        String msg = "abc";
+        producer.send(msg.getBytes());
+        Message<byte[]> message = consumer.receive();
+
+        assertEquals(new String(message.getData()), msg);
+
+        consumer.unsubscribe();
+    }
+
+    @Test
+    public void testDisableAuth() throws PulsarClientException {
+        startSocks5Server(false);
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .socks5ProxyAddress(new InetSocketAddress("localhost", 11080));
+        PulsarClient pulsarClient = replacePulsarClient(clientBuilder);
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        String msg = "abc";
+        producer.send(msg.getBytes());
+    }
+
+    @Test(timeOut = 5000, expectedExceptions = {ThreadTimeoutException.class})

Review comment:
       This test is for auth enabled with incorrect password.  
   If run this test method, pulsar client can't connect to socks5 proxy. The socks5 proxy handler will throw ProxyConnectException, but process result in Netty thread(It's the same behavior with ssl handler). Thus, this will lead pulsar client to reconnect. This is the right behavior, so in order to decrease the time wasted, we'd better stop this with timeout and ThreadTimeoutException.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientWithSocks5ProxyTest.java
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.socks5.Socks5Server;
+import org.apache.pulsar.socks5.config.Socks5Config;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.internal.thread.ThreadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class ClientWithSocks5ProxyTest extends BrokerTestBase {
+
+    private Socks5Server server;
+
+    final String topicName = "persistent://public/default/socks5";
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        baseSetup();
+        initData();
+    }
+
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {

Review comment:
       The test method 'testSendAndConsumer' will use this to customize - pulsarClient. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661213349



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -105,6 +109,11 @@
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
+    // socks5
+    private InetSocketAddress socks5ProxyAddress;

Review comment:
       I just feel it's not accurate by string. If defined in String, mostly have to parse(like System property).
   BTW,  it's the same definition with the Socks5ProxyHandler in Netty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#discussion_r661204474



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
##########
@@ -133,5 +142,24 @@ public ClientConfigurationData clone() {
         }
     }
 
+    public InetSocketAddress getSocks5ProxyAddress() {
+        if (Objects.nonNull(socks5ProxyAddress)) {
+            return socks5ProxyAddress;
+        }
+        String proxyAddress = System.getProperty("socks5Proxy.address");
+        try {
+            URI uri = URI.create(proxyAddress);
+            return new InetSocketAddress(uri.getHost(), uri.getPort());
+        } catch (Exception ignore) {
+            return null;

Review comment:
       I will change to throw it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #11085: Support socks5 proxy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #11085:
URL: https://github.com/apache/pulsar/pull/11085#issuecomment-871826858


   @Technoboy- Looks the failed the tests are related to this change, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org