You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:19:17 UTC

[pulsar] branch branch-2.7 updated (e3084f9 -> 5668058)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from e3084f9  Remove external CIs (#9069)
     new 8457557  Fix Proxy Config bindAddress does not working for servicePort  (#9068)
     new 10f674b  [connector]fix debezium-connector error log (#9063)
     new 5668058  Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/proxy.conf                                    |  3 +
 .../pulsar/broker/service/BrokerService.java       | 17 ++++--
 .../broker/service/NonPersistentTopicE2ETest.java  | 66 +++++++++++++++++++++-
 .../apache/pulsar/functions/sink/PulsarSink.java   |  8 ++-
 .../apache/pulsar/proxy/server/ProxyService.java   |  4 +-
 5 files changed, 87 insertions(+), 11 deletions(-)


[pulsar] 01/03: Fix Proxy Config bindAddress does not working for servicePort (#9068)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 84575578dbebffd97fa1cf603a7c4cee822088a8
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Tue Dec 29 13:46:54 2020 +0800

    Fix Proxy Config bindAddress does not working for servicePort  (#9068)
    
    The proxy config bindAddress, only works for webPort, does not work for the servicePort.
    
    (cherry picked from commit 29921a63bc9b55aa0349d77a5b2c5459dd3cc6e4)
---
 conf/proxy.conf                                                       | 3 +++
 .../src/main/java/org/apache/pulsar/proxy/server/ProxyService.java    | 4 ++--
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 820eca3..91b0196 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -46,6 +46,9 @@ zooKeeperCacheExpirySeconds=300
 
 ### --- Server --- ###
 
+# Hostname or IP address the service binds on, default is 0.0.0.0.
+bindAddress=0.0.0.0
+
 # Hostname or IP address the service advertises to the outside world.
 # If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
 advertisedAddress=
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index c25484c..e45f9c6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -177,7 +177,7 @@ public class ProxyService implements Closeable {
         // Bind and start to accept incoming connections.
         if (proxyConfig.getServicePort().isPresent()) {
             try {
-                listenChannel = bootstrap.bind(proxyConfig.getServicePort().get()).sync().channel();
+                listenChannel = bootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePort().get()).sync().channel();
                 LOG.info("Started Pulsar Proxy at {}", listenChannel.localAddress());
             } catch (Exception e) {
                 throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getServicePort().get(), e);
@@ -187,7 +187,7 @@ public class ProxyService implements Closeable {
         if (proxyConfig.getServicePortTls().isPresent()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
             tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
-            listenChannelTls = tlsBootstrap.bind(proxyConfig.getServicePortTls().get()).sync().channel();
+            listenChannelTls = tlsBootstrap.bind(proxyConfig.getBindAddress(), proxyConfig.getServicePortTls().get()).sync().channel();
             LOG.info("Started Pulsar TLS Proxy on {}", listenChannelTls.localAddress());
         }
 


[pulsar] 03/03: Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56680581a1212f39b84a00717142b4062fa8de5d
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 29 09:43:20 2020 +0100

    Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)
    
    If a non-persistent topic is unused it is automatically deleted by Pulsar. If you then get the stats on that topic name using the REST API, it causes that topic to re-appear.
    
    For example, a non-persistent topic `public/bob/np` exists in a namespace. It is returned when using the `admin/v2/non-persistent/public/bob` endpoint:
    
    ```
    ["non-persistent://public/bob/np"]
    ```
    
    Since this topic is unused, it gets cleaned and no longer is returned by the endpoint:
    
    ```
    []
    ```
    
    However, if you request the stats for that topic using the CLI (which calls the REST API), like this, you actually get a response (not a 404):
    
    ```
    bin/pulsar-admin topics stats non-persistent://public/bob/np
    Warning: Nashorn engine is planned to be removed from a future JDK release
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 0,
      "msgInCounter" : 0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 0,
      "backlogSize" : 0,
      "publishers" : [ ],
      "subscriptions" : { },
      "replication" : { }
    }
    ```
    
    And now the topic re-appears on the topic-list endpoint:
    
    ```
    ["non-persistent://public/bob/np"]
    ```
    
    ### Modifications
    When loading a temporary topic with createIfMissing = false do not try to create it, simply return an empty value.
    Add test case.
    
    This change added tests and can be verified as in the bug description.
    
    Run:
    pulsar-admin topics create non-persistent://public/default/tmp
    wait for the topic to be deleted
    run
    pulsar-admin topics stats non-persistent://public/default/tmp
    
    (cherry picked from commit b860c059eb4d13969469b23c9a3bffd6bf7e5a66)
---
 .../pulsar/broker/service/BrokerService.java       | 17 ++++--
 .../broker/service/NonPersistentTopicE2ETest.java  | 66 +++++++++++++++++++++-
 2 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5348397..32bf07e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -759,10 +759,19 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 }
             }
             final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
-            return topics.computeIfAbsent(topic, (topicName) -> {
-                    return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
-                        : createNonPersistentTopic(topicName);
-            });
+            if (isPersistentTopic) {
+                return topics.computeIfAbsent(topic, (topicName) -> {
+                    return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
+                });
+            } else {
+                return topics.computeIfAbsent(topic, (topicName) -> {
+                    if (createIfMissing) {
+                        return createNonPersistentTopic(topicName);
+                    } else {
+                        return CompletableFuture.completedFuture(Optional.empty());
+                    }
+                });
+            }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", topic, e);
             return failedFuture(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 1bbb20f..fab7a5e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -37,6 +37,8 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -48,6 +50,8 @@ import static org.testng.Assert.assertTrue;
 
 public class NonPersistentTopicE2ETest extends BrokerTestBase {
 
+    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class);
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -102,8 +106,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         assertFalse(topic.isPresent());
         assertFalse(topicHasSchema(topicName));
 
-        // 2. Topic is not GCed with live connection
-        topicName = "non-persistent://prop/ns-abc/topic-2";
+        // 1a. Topic that add/removes subscription can be GC'd
+        topicName = "non-persistent://prop/ns-abc/topic-1a";
         String subName = "sub1";
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
         topic = getTopic(topicName);
@@ -111,6 +115,23 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         topic.get().addSchema(schemaData).join();
         assertTrue(topicHasSchema(topicName));
 
+        admin.topics().deleteSubscription(topicName, subName);
+        consumer.close();
+
+        runGC();
+        topic = getTopic(topicName);
+        assertFalse(topic.isPresent());
+        assertFalse(topicHasSchema(topicName));
+
+        // 2. Topic is not GCed with live connection
+        topicName = "non-persistent://prop/ns-abc/topic-2";
+        subName = "sub1";
+        consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+        topic = getTopic(topicName);
+        assertTrue(topic.isPresent());
+        topic.get().addSchema(schemaData).join();
+        assertTrue(topicHasSchema(topicName));
+
         runGC();
         topic = getTopic(topicName);
         assertTrue(topic.isPresent());
@@ -170,4 +191,45 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         producer2.close();
     }
 
+    @Test
+    public void testGC() throws Exception {
+        // 1. Simple successful GC
+        String topicName = "non-persistent://prop/ns-abc/topic-10";
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+        runGC();
+        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 2. Topic is not GCed with live connection
+        String subName = "sub1";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+        runGC();
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 3. Topic with subscription is not GCed even with no connections
+        consumer.close();
+
+        runGC();
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 4. Topic can be GCed after unsubscribe
+        admin.topics().deleteSubscription(topicName, subName);
+
+        runGC();
+        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+        // 5. Get the topic and make sure it doesn't come back
+        admin.lookups().lookupTopic(topicName);
+        Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
+        assertFalse(topic.isPresent());
+        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // write again, the topic will be available
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
+        producer2.close();
+
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+    }
 }


[pulsar] 02/03: [connector]fix debezium-connector error log (#9063)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 10f674b65db43ccb738010f9e20565b5f4c63b9b
Author: wangyufan <wa...@gmail.com>
AuthorDate: Tue Dec 29 13:46:23 2020 +0800

    [connector]fix debezium-connector error log (#9063)
    
    ### Motivation
    
    debezium-connector exception message not clear.
    
    ### Modifications
    
    when debezium-connector throw exception, these code will not be execute:
    ```
    // getRecordSequence is empty
    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get());
    log.error(errorMsg);
    ```
    
    After repair:
    ```
    17:42:43,274 ERROR [public/default/debezium-mysql-source-exchange-0] [instance: 0] PulsarSink - Failed to publish to topic [public/default/xx.xx.xx] with error [org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes]
    ```
    
    (cherry picked from commit 19ff4c3f44d30d2f61fa34b81d1160b524f6b7b3)
---
 .../main/java/org/apache/pulsar/functions/sink/PulsarSink.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index d8a1de9..7939eb5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -191,11 +191,13 @@ public class PulsarSink<T> implements Sink<T> {
                 String errorMsg = null;
                 if (srcRecord instanceof PulsarRecord) {
                     errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId());
-                    log.error(errorMsg);
                 } else {
-                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get());
-                    log.error(errorMsg);
+                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s]", topic, throwable.getMessage());
+                    if (record.getRecordSequence().isPresent()) {
+                        errorMsg = String.format(errorMsg + " with src sequence id [%s]", record.getRecordSequence().get());
+                    }
                 }
+                log.error(errorMsg);
                 stats.incrSinkExceptions(new Exception(errorMsg));
                 return null;
             };