You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/17 20:36:54 UTC
[incubator-pulsar] branch master updated: [tests] Fix the
synchronization problem at
BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 259275b [tests] Fix the synchronization problem at BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
259275b is described below
commit 259275ba8df5750e64fa15781dc2a968959ce71f
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 17 13:36:50 2018 -0700
[tests] Fix the synchronization problem at BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
*Motivation*
Following exception is observed in one of the CI jobs.
```
java.lang.NullPointerException
at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:44)
at org.apache.pulsar.client.impl.BrokerClientIntegrationTest.testMaxConcurrentTopicLoading(BrokerClientIntegrationTest.java:601)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
The problem seems to be coming from improper synchronization in the tests.
*Changes*
Fix the synchronization problem in BrokerClientIntegrationTest.testMaxConcurrentTopicLoading
---
.../pulsar/client/impl/BrokerClientIntegrationTest.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index bf65fdd..0e47dfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -583,7 +583,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests);
- List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
+ final List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
final int totalProducers = 10;
CountDownLatch latch = new CountDownLatch(totalProducers);
for (int i = 0; i < totalProducers; i++) {
@@ -591,14 +591,18 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
final String randomTopicName1 = topicName + randomUUID().toString();
final String randomTopicName2 = topicName + randomUUID().toString();
// pass producer-name to avoid exception: producer is already connected to topic
- futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
- futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
+ synchronized (futures) {
+ futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
+ futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
+ }
latch.countDown();
});
}
latch.await();
- FutureUtil.waitForAll(futures).get();
+ synchronized (futures) {
+ FutureUtil.waitForAll(futures).get();
+ }
pulsarClient.close();
pulsarClient2.close();
} finally {