You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/10 07:02:33 UTC

[pulsar] branch branch-2.6 updated (156b4be -> 7f04056)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 156b4be  Fix zk cache expiration check (#8458)
     new b2f0837  Fix request.getContentLength() to return 0 if it is less than 0 (#8448)
     new bcb97d1  [Pulsar Proxy] Add error log for pulsar proxy starter (#8451)
     new 7d00d0b  add elapsedMs for create ledger in log (#8473)
     new 7f04056  [C++] Catch exception thrown by remote_endpoint (#8486)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   3 +-
 pulsar-client-cpp/lib/BlockingQueue.h              |   1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  12 +-
 pulsar-client-cpp/tests/ClientTest.cc              |  13 ++
 .../pulsar/proxy/server/AdminProxyHandler.java     |   2 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 160 +++++++++++----------
 .../pulsar/proxy/server/AdminProxyHandlerTest.java |  56 ++++++++
 7 files changed, 166 insertions(+), 81 deletions(-)
 create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java


[pulsar] 04/04: [C++] Catch exception thrown by remote_endpoint (#8486)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7f04056c441a2cdb74817c4d666875b5660ebfcb
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Nov 10 14:44:46 2020 +0800

    [C++] Catch exception thrown by remote_endpoint (#8486)
    
    ### Motivation
    
    Boost asio `socket::remote_point` may throw `boost::system::system_error` on failure. If the C++ client library was compiled with some low version boost, like 1.54, even if `async_connect` success, the server could still be unreachable and an exception would be thrown by `socket_->remote_endpoint()`.
    
    ### Modifications
    
    - Catch the exception for  `socket_->remote_endpoint()`
    - Add tests for when client connects to a unreachable service url
    - Fix header dependency because some boost *.hpp file doesn't include assert.h
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
    
      - Run `testServerConnectError`.
    
    (cherry picked from commit 53cd2bbbe173dadc08dc07f7f3e88c6762e87814)
---
 pulsar-client-cpp/lib/BlockingQueue.h     |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc | 12 +++++++++---
 pulsar-client-cpp/tests/ClientTest.cc     | 13 +++++++++++++
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/lib/BlockingQueue.h b/pulsar-client-cpp/lib/BlockingQueue.h
index 1cdaf93..5e466bd 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -19,6 +19,7 @@
 #ifndef LIB_BLOCKINGQUEUE_H_
 #define LIB_BLOCKINGQUEUE_H_
 
+#include <assert.h>
 #include <mutex>
 #include <condition_variable>
 #include <boost/circular_buffer.hpp>
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index ae44b52..959d64b 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -337,9 +337,15 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
                                           tcp::resolver::iterator endpointIterator) {
     if (!err) {
         std::stringstream cnxStringStream;
-        cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
-        cnxString_ = cnxStringStream.str();
-
+        try {
+            cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
+                            << "] ";
+            cnxString_ = cnxStringStream.str();
+        } catch (const boost::system::system_error& e) {
+            LOG_ERROR("Failed to get endpoint: " << e.what());
+            close();
+            return;
+        }
         if (logicalAddress_ == physicalAddress_) {
             LOG_INFO(cnxString_ << "Connected to broker");
         } else {
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index d4eaca7..91232dc 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -73,3 +73,16 @@ TEST(ClientTest, testSwHwChecksum) {
     ASSERT_EQ(hwIncrementalChecksum, hwDoubleChecksum);
     ASSERT_EQ(hwIncrementalChecksum, swIncrementalChecksum);
 }
+
+TEST(ClientTest, testServerConnectError) {
+    const std::string topic = "test-server-connect-error";
+    Client client("pulsar://localhost:65535");
+    Producer producer;
+    ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer));
+    Consumer consumer;
+    ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer));
+    Reader reader;
+    ReaderConfiguration readerConf;
+    ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+    client.close();
+}


[pulsar] 01/04: Fix request.getContentLength() to return 0 if it is less than 0 (#8448)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b2f0837b8003c3d23611384a2fff63b17b23506b
Author: かとかい <25...@gmail.com>
AuthorDate: Thu Nov 5 15:22:56 2020 +0900

    Fix request.getContentLength() to return 0 if it is less than 0 (#8448)
    
    ### Motivation
    
    - "Negative initial size: -1" error occurs when submitting an HTTP request which has "Content-Type: application/json"  and no request body .
    
    ### Modifications
    
    - Pass 0 to argument of ByteArrayOutputStream if request.getContentLength() returns a negative number.
    - Add unit test.
    
    
    (cherry picked from commit 9ff92845e67a468938f135ff72d91755d16f2cbf)
---
 .../pulsar/proxy/server/AdminProxyHandler.java     |  2 +-
 .../pulsar/proxy/server/AdminProxyHandlerTest.java | 56 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 697ddf9..28a3140 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -169,7 +169,7 @@ class AdminProxyHandler extends ProxyServlet {
         private final ByteArrayOutputStream bodyBuffer;
         protected ReplayableProxyContentProvider(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, InputStream input) {
             super(request, response, proxyRequest, input);
-            bodyBuffer = new ByteArrayOutputStream(request.getContentLength());
+            bodyBuffer = new ByteArrayOutputStream(Math.max(request.getContentLength(), 0));
         }
 
         @Override
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
new file mode 100644
index 0000000..ad7f0e5
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.*;
+
+import org.eclipse.jetty.client.api.Request;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class AdminProxyHandlerTest {
+
+    @Test
+    public void replayableProxyContentProviderTest() throws Exception {
+
+        AdminProxyHandler adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
+                mock(BrokerDiscoveryProvider.class));
+
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        doReturn(-1).when(request).getContentLength();
+
+        try {
+            AdminProxyHandler.ReplayableProxyContentProvider replayableProxyContentProvider = adminProxyHandler.new ReplayableProxyContentProvider(
+                    request, mock(HttpServletResponse.class), mock(Request.class), mock(InputStream.class));
+            Field field = replayableProxyContentProvider.getClass().getDeclaredField("bodyBuffer");
+            field.setAccessible(true);
+            Assert.assertEquals(((ByteArrayOutputStream) field.get(replayableProxyContentProvider)).size(), 0);
+        } catch (IllegalArgumentException e) {
+            Assert.fail("IllegalArgumentException should not be thrown");
+        }
+
+    }
+}


[pulsar] 02/04: [Pulsar Proxy] Add error log for pulsar proxy starter (#8451)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bcb97d16adcca76c452e622e265b454b03213bbb
Author: ran <ga...@126.com>
AuthorDate: Thu Nov 5 14:24:29 2020 +0800

    [Pulsar Proxy] Add error log for pulsar proxy starter (#8451)
    
    ### Motivation
    
    Currently, the proxy service starter will throw all exceptions to the main method directly, it's hard to check the error log if there is an exception when proxy service start.
    
    ### Modifications
    
    Add try-catch for proxy start method.
    
    (cherry picked from commit 442a54704e4586f604050727cfc28ef17c3f7e02)
---
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 160 +++++++++++----------
 1 file changed, 84 insertions(+), 76 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 695bd0c..3a30b47 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -25,6 +25,7 @@ import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
@@ -76,97 +77,104 @@ public class ProxyServiceStarter {
     private boolean help = false;
 
     public ProxyServiceStarter(String[] args) throws Exception {
-        // setup handlers
-        removeHandlersForRootLogger();
-        install();
-
-        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
-        Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
-            System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
-        });
-
-        JCommander jcommander = new JCommander();
         try {
-            jcommander.addObject(this);
-            jcommander.parse(args);
-            if (help || isBlank(configFile)) {
-                jcommander.usage();
-                return;
-            }
-        } catch (Exception e) {
-            jcommander.usage();
-            System.exit(-1);
-        }
-
-        // load config file
-        final ProxyConfiguration config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
-
-        if (!isBlank(zookeeperServers)) {
-            // Use zookeeperServers from command line
-            config.setZookeeperServers(zookeeperServers);
-        }
-
-        if (!isBlank(globalZookeeperServers)) {
-            // Use globalZookeeperServers from command line
-            config.setConfigurationStoreServers(globalZookeeperServers);
-        }
-        if (!isBlank(configurationStoreServers)) {
-            // Use configurationStoreServers from command line
-            config.setConfigurationStoreServers(configurationStoreServers);
-        }
-
-        if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
-                || config.isAuthorizationEnabled()) {
-            checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
-            checkArgument(!isEmpty(config.getConfigurationStoreServers()),
-                    "configurationStoreServers must be provided");
-        }
 
-        if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
-                || (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
-            checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
-        }
+            // setup handlers
+            removeHandlersForRootLogger();
+            install();
 
-        AuthenticationService authenticationService = new AuthenticationService(
-                PulsarConfigurationLoader.convertFrom(config));
-        // create proxy service
-        ProxyService proxyService = new ProxyService(config, authenticationService);
-        // create a web-service
-        final WebServer server = new WebServer(config, authenticationService);
+            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
+            Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
+                System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+            });
 
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            JCommander jcommander = new JCommander();
             try {
-                proxyService.close();
-                server.stop();
+                jcommander.addObject(this);
+                jcommander.parse(args);
+                if (help || isBlank(configFile)) {
+                    jcommander.usage();
+                    return;
+                }
             } catch (Exception e) {
-                log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
+                jcommander.usage();
+                System.exit(-1);
             }
-        }));
 
-        proxyService.start();
+            // load config file
+            final ProxyConfiguration config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
 
-        // Setup metrics
-        DefaultExports.initialize();
+            if (!isBlank(zookeeperServers)) {
+                // Use zookeeperServers from command line
+                config.setZookeeperServers(zookeeperServers);
+            }
 
-        // Report direct memory from Netty counters
-        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
-            @Override
-            public double get() {
-                return getJvmDirectMemoryUsed();
+            if (!isBlank(globalZookeeperServers)) {
+                // Use globalZookeeperServers from command line
+                config.setConfigurationStoreServers(globalZookeeperServers);
+            }
+            if (!isBlank(configurationStoreServers)) {
+                // Use configurationStoreServers from command line
+                config.setConfigurationStoreServers(configurationStoreServers);
+            }
+
+            if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
+                    || config.isAuthorizationEnabled()) {
+                checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
+                checkArgument(!isEmpty(config.getConfigurationStoreServers()),
+                        "configurationStoreServers must be provided");
             }
-        }).register(CollectorRegistry.defaultRegistry);
 
-        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
-            @Override
-            public double get() {
-                return PlatformDependent.maxDirectMemory();
+            if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
+                    || (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
+                checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
             }
-        }).register(CollectorRegistry.defaultRegistry);
 
-        addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
+            AuthenticationService authenticationService = new AuthenticationService(
+                    PulsarConfigurationLoader.convertFrom(config));
+            // create proxy service
+            ProxyService proxyService = new ProxyService(config, authenticationService);
+            // create a web-service
+            final WebServer server = new WebServer(config, authenticationService);
+
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                try {
+                    proxyService.close();
+                    server.stop();
+                } catch (Exception e) {
+                    log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
+                }
+            }));
+
+            proxyService.start();
+
+            // Setup metrics
+            DefaultExports.initialize();
+
+            // Report direct memory from Netty counters
+            Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
+                @Override
+                public double get() {
+                    return getJvmDirectMemoryUsed();
+                }
+            }).register(CollectorRegistry.defaultRegistry);
+
+            Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
+                @Override
+                public double get() {
+                    return PlatformDependent.maxDirectMemory();
+                }
+            }).register(CollectorRegistry.defaultRegistry);
+
+            addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
+
+            // start web-service
+            server.start();
 
-        // start web-service
-        server.start();
+        } catch (Exception e) {
+            log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e);
+            throw new PulsarServerException(e);
+        }
     }
 
     public static void main(String[] args) throws Exception {


[pulsar] 03/04: add elapsedMs for create ledger in log (#8473)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7d00d0b8daac72d3b3cd4f87fd0df4cf0326dcf9
Author: Aloys <lo...@gmail.com>
AuthorDate: Sun Nov 8 16:43:31 2020 +0800

    add elapsedMs for create ledger in log (#8473)
    
    ### Motivation
    Fix log only has placeholder problem.
    
    (cherry picked from commit a3584309017f1894a05b05c695c42e7aa8b7c3a7)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b6e408e..0a42dc5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -636,7 +636,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
                 if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getMetadataOperationsTimeoutSeconds())) {
                     log.info("[{}] Ledger creation was initiated {} ms ago but it never completed" +
-                        " and creation timeout task didn't kick in as well. Force to fail the create ledger operation ...");
+                        " and creation timeout task didn't kick in as well. Force to fail the create ledger operation.",
+                            name, elapsedMs);
                     this.createComplete(Code.TimeoutException, null, null);
                 }
             }