You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/27 10:35:28 UTC

[pulsar] branch branch-2.8 updated (b817fcd -> ad25379)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from b817fcd  revert PR 11594 to avoid copy data to direct buffer afer aircompressor upgrade to 0.20 (#11792)
     new 48892be  Fix unnecessary user interactions in building pulsar-standalone image (#11623)
     new ed758ab  fix generate javadoc for kafka-connect-adaptor failed (#11807)
     new eed171d  Producer getting producer busy is removing existing producer from list (#11804)
     new ad25379  fix flaky test testReacquireLocksAfterSessionLost (#11815)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/pulsar-standalone/Dockerfile                |  2 +
 .../org/apache/pulsar/broker/service/Producer.java |  5 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 18 +++++++++
 .../broker/service/PersistentTopicE2ETest.java     | 45 ++++++++++++++++++++++
 pulsar-io/kafka-connect-adaptor/pom.xml            |  6 +++
 .../org/apache/pulsar/metadata/ZKSessionTest.java  |  2 +-
 6 files changed, 76 insertions(+), 2 deletions(-)

[pulsar] 04/04: fix flaky test testReacquireLocksAfterSessionLost (#11815)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ad253798dbe9eccfc5dcb53fec9b20f81781a884
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Aug 27 18:07:20 2021 +0800

    fix flaky test testReacquireLocksAfterSessionLost (#11815)
    
    ### Motivation
    Then run ZKSessionTest#testReacquireLocksAfterSessionLost, it will fail occasionally.
    
    Error:  testReacquireLocksAfterSessionLost(org.apache.pulsar.metadata.ZKSessionTest)  Time elapsed: 12.974 s  <<< FAILURE!
    org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a lambda expression in org.apache.pulsar.metadata.ZKSessionTest that uses org.apache.pulsar.metadata.api.coordination.ResourceLock expected [false] but found [true] within 10 seconds.
      at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
      at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
      at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
      at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
      at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:679)
      at org.apache.pulsar.metadata.ZKSessionTest.testReacquireLocksAfterSessionLost(ZKSessionTest.java:130)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
      at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
      at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
      at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
      at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.lang.AssertionError: expected [false] but found [true]
      at org.testng.Assert.fail(Assert.java:99)
      at org.testng.Assert.failNotEquals(Assert.java:1037)
      at org.testng.Assert.assertFalse(Assert.java:67)
      at org.testng.Assert.assertFalse(Assert.java:77)
      at org.apache.pulsar.metadata.ZKSessionTest.lambda$testReacquireLocksAfterSessionLost$0(ZKSessionTest.java:131)
      at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
      at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:222)
      at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:209)
      ... 4 more
    
    ###  Modification
    Change the waiting time from default 10s to 30s.
    
    (cherry picked from commit adf1007cca44e1f4dbd8b3a0d4cfaa9665898439)
---
 .../src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java         | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 24c4bd7..7a0a006 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -130,7 +130,7 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionReestablished);
 
-        Awaitility.await().untilAsserted(() -> {
+        Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
             assertFalse(lock.getLockExpiredFuture().isDone());
         });
 

[pulsar] 03/04: Producer getting producer busy is removing existing producer from list (#11804)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eed171d759b5935e9bc13914838ba0e329dccad2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 27 02:18:41 2021 -0700

    Producer getting producer busy is removing existing producer from list (#11804)
    
    ### Motivation
    When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic).
    
    The problem is the producer close is triggering a removal from the map:
    pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
    
    Line 683 in 43ded59
    
     if (producers.remove(producer.getProducerName(), producer)) {
    Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map.
    
    Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection.
    
    * Producer getting producer busy is removing existing producer from list
    
    * Fixed test
    
    (cherry picked from commit 6aef83f3b77c343b9ea3edc1c07dbaf6bac9bd59)
---
 .../org/apache/pulsar/broker/service/Producer.java |  5 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 18 +++++++++
 .../broker/service/PersistentTopicE2ETest.java     | 45 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 8c35e66..12697f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -147,7 +147,10 @@ public class Producer {
     public boolean equals(Object obj) {
         if (obj instanceof Producer) {
             Producer other = (Producer) obj;
-            return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic);
+            return Objects.equals(producerName, other.producerName)
+                    && Objects.equals(topic, other.topic)
+                    && producerId == other.producerId
+                    && Objects.equals(cnx, other.cnx);
         }
 
         return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6a5ee25..84bba9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -1531,6 +1532,23 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ServerCnx other = (ServerCnx) o;
+        return Objects.equals(ctx().channel().id(), other.ctx().channel().id());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(ctx().channel().id());
+    }
+
+    @Override
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         checkArgument(state == State.Connected);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index ce48402..c39c692 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1786,4 +1786,49 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         assertEquals(msg.getValue(), "test");
         assertEquals(msg.getEventTime(), 5);
     }
+
+    @Test
+    public void testProducerBusy() throws Exception {
+        final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .producerName("xxx")
+                .create();
+
+        assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+
+        for (int i =0; i < 5; i++) {
+            try {
+                pulsarClient.newProducer(Schema.STRING)
+                        .topic(topicName)
+                        .producerName("xxx")
+                        .create();
+                fail("Should have failed");
+            } catch (ProducerBusyException e) {
+                // Expected
+            }
+
+            assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+        }
+
+        // Try from different connection
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder()
+                .serviceUrl(getPulsar().getBrokerServiceUrl())
+                .build();
+
+        try {
+            client2.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .producerName("xxx")
+                    .create();
+            fail("Should have failed");
+        } catch (ProducerBusyException e) {
+            // Expected
+        }
+
+        assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+    }
 }

[pulsar] 01/04: Fix unnecessary user interactions in building pulsar-standalone image (#11623)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 48892bed5eb0efe9c2478dc446c8073b08fbf91e
Author: Ting Yuan <yu...@ict.ac.cn>
AuthorDate: Wed Aug 11 03:14:11 2021 +0800

    Fix unnecessary user interactions in building pulsar-standalone image (#11623)
    
    (cherry picked from commit 18ecf3c61e841632fab06fb5e0420b6d84b3baee)
---
 docker/pulsar-standalone/Dockerfile | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/docker/pulsar-standalone/Dockerfile b/docker/pulsar-standalone/Dockerfile
index 01718e4..cabbb7d 100644
--- a/docker/pulsar-standalone/Dockerfile
+++ b/docker/pulsar-standalone/Dockerfile
@@ -26,6 +26,8 @@ FROM apachepulsar/pulsar-dashboard:latest as dashboard
 # Restart from
 FROM ubuntu:20.04
 
+ARG DEBIAN_FRONTEND=noninteractive
+
 # Note that the libpq-dev package is needed here in order to install
 # the required python psycopg2 package (for postgresql) later
 RUN apt-get update \

[pulsar] 02/04: fix generate javadoc for kafka-connect-adaptor failed (#11807)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ed758ab8b590d1e54990531468a9d1f9d479e828
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Aug 27 09:48:58 2021 +0800

    fix generate javadoc for kafka-connect-adaptor failed (#11807)
    
    (cherry picked from commit 6aa44a0e7cfed161da8db35e6b50c9f3a595629e)
---
 pulsar-io/kafka-connect-adaptor/pom.xml | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index c23e4f5..bed59f6 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -113,5 +113,11 @@
       <type>test-jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
   </dependencies>
 </project>