You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/17 20:36:54 UTC

[incubator-pulsar] branch master updated: [tests] Fix the synchronization problem at BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 259275b  [tests] Fix the synchronization problem at BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
259275b is described below

commit 259275ba8df5750e64fa15781dc2a968959ce71f
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 17 13:36:50 2018 -0700

    [tests] Fix the synchronization problem at BrokerClientIntegrationTest.testMaxConcurrentTopicLoading (#2595)
    
    *Motivation*
    
    Following exception is observed in one of the CI jobs.
    
    ```
    java.lang.NullPointerException
    	at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:44)
    	at org.apache.pulsar.client.impl.BrokerClientIntegrationTest.testMaxConcurrentTopicLoading(BrokerClientIntegrationTest.java:601)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
    	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
    	at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    The problem seems to be coming from improper synchronization in the tests.
    
    *Changes*
    
    Fix the synchronization problem in BrokerClientIntegrationTest.testMaxConcurrentTopicLoading
---
 .../pulsar/client/impl/BrokerClientIntegrationTest.java      | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index bf65fdd..0e47dfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -583,7 +583,7 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
             ClientCnx cnx = producer.cnx();
             assertTrue(cnx.channel().isActive());
             ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests);
-            List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
+            final List<CompletableFuture<Producer<byte[]>>> futures = Lists.newArrayList();
             final int totalProducers = 10;
             CountDownLatch latch = new CountDownLatch(totalProducers);
             for (int i = 0; i < totalProducers; i++) {
@@ -591,14 +591,18 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
                     final String randomTopicName1 = topicName + randomUUID().toString();
                     final String randomTopicName2 = topicName + randomUUID().toString();
                     // pass producer-name to avoid exception: producer is already connected to topic
-                    futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
-                    futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
+                    synchronized (futures) {
+                        futures.add(pulsarClient2.newProducer().topic(randomTopicName1).createAsync());
+                        futures.add(pulsarClient.newProducer().topic(randomTopicName2).createAsync());
+                    }
                     latch.countDown();
                 });
             }
 
             latch.await();
-            FutureUtil.waitForAll(futures).get();
+            synchronized (futures) {
+                FutureUtil.waitForAll(futures).get();
+            }
             pulsarClient.close();
             pulsarClient2.close();
         } finally {