You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/27 10:35:28 UTC
[pulsar] branch branch-2.8 updated (b817fcd -> ad25379)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.
from b817fcd revert PR 11594 to avoid copy data to direct buffer afer aircompressor upgrade to 0.20 (#11792)
new 48892be Fix unnecessary user interactions in building pulsar-standalone image (#11623)
new ed758ab fix generate javadoc for kafka-connect-adaptor failed (#11807)
new eed171d Producer getting producer busy is removing existing producer from list (#11804)
new ad25379 fix flaky test testReacquireLocksAfterSessionLost (#11815)
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
docker/pulsar-standalone/Dockerfile | 2 +
.../org/apache/pulsar/broker/service/Producer.java | 5 ++-
.../apache/pulsar/broker/service/ServerCnx.java | 18 +++++++++
.../broker/service/PersistentTopicE2ETest.java | 45 ++++++++++++++++++++++
pulsar-io/kafka-connect-adaptor/pom.xml | 6 +++
.../org/apache/pulsar/metadata/ZKSessionTest.java | 2 +-
6 files changed, 76 insertions(+), 2 deletions(-)
[pulsar] 04/04: fix flaky test testReacquireLocksAfterSessionLost
(#11815)
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ad253798dbe9eccfc5dcb53fec9b20f81781a884
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Aug 27 18:07:20 2021 +0800
fix flaky test testReacquireLocksAfterSessionLost (#11815)
### Motivation
Then run ZKSessionTest#testReacquireLocksAfterSessionLost, it will fail occasionally.
Error: testReacquireLocksAfterSessionLost(org.apache.pulsar.metadata.ZKSessionTest) Time elapsed: 12.974 s <<< FAILURE!
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a lambda expression in org.apache.pulsar.metadata.ZKSessionTest that uses org.apache.pulsar.metadata.api.coordination.ResourceLock expected [false] but found [true] within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:679)
at org.apache.pulsar.metadata.ZKSessionTest.testReacquireLocksAfterSessionLost(ZKSessionTest.java:130)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.AssertionError: expected [false] but found [true]
at org.testng.Assert.fail(Assert.java:99)
at org.testng.Assert.failNotEquals(Assert.java:1037)
at org.testng.Assert.assertFalse(Assert.java:67)
at org.testng.Assert.assertFalse(Assert.java:77)
at org.apache.pulsar.metadata.ZKSessionTest.lambda$testReacquireLocksAfterSessionLost$0(ZKSessionTest.java:131)
at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:222)
at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:209)
... 4 more
### Modification
Change the waiting time from default 10s to 30s.
(cherry picked from commit adf1007cca44e1f4dbd8b3a0d4cfaa9665898439)
---
.../src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 24c4bd7..7a0a006 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -130,7 +130,7 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionReestablished);
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertFalse(lock.getLockExpiredFuture().isDone());
});
[pulsar] 03/04: Producer getting producer busy is removing existing
producer from list (#11804)
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eed171d759b5935e9bc13914838ba0e329dccad2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 27 02:18:41 2021 -0700
Producer getting producer busy is removing existing producer from list (#11804)
### Motivation
When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic).
The problem is the producer close is triggering a removal from the map:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
Line 683 in 43ded59
if (producers.remove(producer.getProducerName(), producer)) {
Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map.
Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection.
* Producer getting producer busy is removing existing producer from list
* Fixed test
(cherry picked from commit 6aef83f3b77c343b9ea3edc1c07dbaf6bac9bd59)
---
.../org/apache/pulsar/broker/service/Producer.java | 5 ++-
.../apache/pulsar/broker/service/ServerCnx.java | 18 +++++++++
.../broker/service/PersistentTopicE2ETest.java | 45 ++++++++++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 8c35e66..12697f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -147,7 +147,10 @@ public class Producer {
public boolean equals(Object obj) {
if (obj instanceof Producer) {
Producer other = (Producer) obj;
- return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic);
+ return Objects.equals(producerName, other.producerName)
+ && Objects.equals(topic, other.topic)
+ && producerId == other.producerId
+ && Objects.equals(cnx, other.cnx);
}
return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6a5ee25..84bba9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -41,6 +41,7 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -1531,6 +1532,23 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ServerCnx other = (ServerCnx) o;
+ return Objects.equals(ctx().channel().id(), other.ctx().channel().id());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ctx().channel().id());
+ }
+
+ @Override
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
checkArgument(state == State.Connected);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index ce48402..c39c692 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1786,4 +1786,49 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertEquals(msg.getValue(), "test");
assertEquals(msg.getEventTime(), 5);
}
+
+ @Test
+ public void testProducerBusy() throws Exception {
+ final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .producerName("xxx")
+ .create();
+
+ assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+
+ for (int i =0; i < 5; i++) {
+ try {
+ pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .producerName("xxx")
+ .create();
+ fail("Should have failed");
+ } catch (ProducerBusyException e) {
+ // Expected
+ }
+
+ assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+ }
+
+ // Try from different connection
+ @Cleanup
+ PulsarClient client2 = PulsarClient.builder()
+ .serviceUrl(getPulsar().getBrokerServiceUrl())
+ .build();
+
+ try {
+ client2.newProducer(Schema.STRING)
+ .topic(topicName)
+ .producerName("xxx")
+ .create();
+ fail("Should have failed");
+ } catch (ProducerBusyException e) {
+ // Expected
+ }
+
+ assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
+ }
}
[pulsar] 01/04: Fix unnecessary user interactions in building
pulsar-standalone image (#11623)
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 48892bed5eb0efe9c2478dc446c8073b08fbf91e
Author: Ting Yuan <yu...@ict.ac.cn>
AuthorDate: Wed Aug 11 03:14:11 2021 +0800
Fix unnecessary user interactions in building pulsar-standalone image (#11623)
(cherry picked from commit 18ecf3c61e841632fab06fb5e0420b6d84b3baee)
---
docker/pulsar-standalone/Dockerfile | 2 ++
1 file changed, 2 insertions(+)
diff --git a/docker/pulsar-standalone/Dockerfile b/docker/pulsar-standalone/Dockerfile
index 01718e4..cabbb7d 100644
--- a/docker/pulsar-standalone/Dockerfile
+++ b/docker/pulsar-standalone/Dockerfile
@@ -26,6 +26,8 @@ FROM apachepulsar/pulsar-dashboard:latest as dashboard
# Restart from
FROM ubuntu:20.04
+ARG DEBIAN_FRONTEND=noninteractive
+
# Note that the libpq-dev package is needed here in order to install
# the required python psycopg2 package (for postgresql) later
RUN apt-get update \
[pulsar] 02/04: fix generate javadoc for kafka-connect-adaptor
failed (#11807)
Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ed758ab8b590d1e54990531468a9d1f9d479e828
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Aug 27 09:48:58 2021 +0800
fix generate javadoc for kafka-connect-adaptor failed (#11807)
(cherry picked from commit 6aa44a0e7cfed161da8db35e6b50c9f3a595629e)
---
pulsar-io/kafka-connect-adaptor/pom.xml | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index c23e4f5..bed59f6 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -113,5 +113,11 @@
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</project>