You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/06 12:30:16 UTC

[pulsar] branch master updated: [fix][tests] Fix flaky test V1_ProxyAuthenticationTest.anonymousSocketTest (#17934)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 24a38ea2b16 [fix][tests] Fix flaky test V1_ProxyAuthenticationTest.anonymousSocketTest (#17934)
24a38ea2b16 is described below

commit 24a38ea2b16f566fbe4b3eaec1f364b75d79b09f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Oct 6 07:30:06 2022 -0500

    [fix][tests] Fix flaky test V1_ProxyAuthenticationTest.anonymousSocketTest (#17934)
    
    - fix thread safety issues with SimpleConsumerSocket and SimpleProducerSocket
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   |  2 +-
 .../websocket/proxy/SimpleConsumerSocket.java      | 25 ++++++++++++----------
 .../websocket/proxy/SimpleProducerSocket.java      | 19 ++++++++--------
 .../proxy/v1/V1_ProxyAuthenticationTest.java       | 10 ++++-----
 4 files changed, 29 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 7abdc643462..c89c5397c87 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -968,7 +968,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
             Awaitility.await().untilAsserted(() ->
                     assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
 
-            for (JsonObject msg : consumeSocket.messages) {
+            for (JsonObject msg : consumeSocket.getMessages()) {
                 assertTrue(msg.has("encryptionContext"));
                 JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
                 JsonObject keys = encryptionCtx.getAsJsonObject("keys");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index b1a9908d723..8e5f20afe67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -18,23 +18,23 @@
  */
 package org.apache.pulsar.websocket.proxy;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
-import com.google.gson.Gson;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,16 +43,16 @@ public class SimpleConsumerSocket {
     private static final String X_PULSAR_MESSAGE_ID = "messageId";
     private final CountDownLatch closeLatch;
     private Session session;
-    private final ArrayList<String> consumerBuffer;
-    final ArrayList<JsonObject> messages;
+    private final List<String> consumerBuffer;
+    private final List<JsonObject> messages;
     private final AtomicInteger receivedMessages = new AtomicInteger();
     // Custom message handler to override standard message processing, if it's needed
     private SimpleConsumerMessageHandler customMessageHandler;
 
     public SimpleConsumerSocket() {
         this.closeLatch = new CountDownLatch(1);
-        consumerBuffer = new ArrayList<>();
-        this.messages = new ArrayList<>();
+        consumerBuffer = Collections.synchronizedList(new ArrayList<>());
+        this.messages = Collections.synchronizedList(new ArrayList<>());
     }
 
     public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
@@ -125,7 +125,7 @@ public class SimpleConsumerSocket {
         return this.session;
     }
 
-    public synchronized ArrayList<String> getBuffer() {
+    public List<String> getBuffer() {
         return consumerBuffer;
     }
 
@@ -135,4 +135,7 @@ public class SimpleConsumerSocket {
 
     private static final Logger log = LoggerFactory.getLogger(SimpleConsumerSocket.class);
 
+    public List<JsonObject> getMessages() {
+        return messages;
+    }
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
index 0d15e56d56c..04b24d5be2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
@@ -19,12 +19,16 @@
 package org.apache.pulsar.websocket.proxy;
 
 import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
 import org.eclipse.jetty.websocket.api.Session;
@@ -35,17 +39,12 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-
 @WebSocket(maxTextMessageSize = 64 * 1024)
 public class SimpleProducerSocket {
 
     private final CountDownLatch closeLatch;
     private Session session;
-    private final ArrayList<String> producerBuffer;
+    private final List<String> producerBuffer;
     private final int messagesToSendWhenConnected;
 
     public SimpleProducerSocket() {
@@ -54,7 +53,7 @@ public class SimpleProducerSocket {
 
     public SimpleProducerSocket(int messagesToSendWhenConnected) {
         this.closeLatch = new CountDownLatch(1);
-        this.producerBuffer = new ArrayList<>();
+        this.producerBuffer = Collections.synchronizedList(new ArrayList<>());
         this.messagesToSendWhenConnected = messagesToSendWhenConnected;
     }
 
@@ -103,7 +102,7 @@ public class SimpleProducerSocket {
         return this.session;
     }
 
-    public synchronized ArrayList<String> getBuffer() {
+    public List<String> getBuffer() {
         return producerBuffer;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 09a7d3a90fd..4c4be4b68e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.websocket.proxy.SimpleProducerSocket;
 import org.apache.pulsar.websocket.service.ProxyServer;
 import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
 import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
 import org.eclipse.jetty.websocket.client.WebSocketClient;
@@ -142,11 +143,10 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
         Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
         Assert.assertTrue(consumerFuture.get().isOpen());
         Assert.assertTrue(producerFuture.get().isOpen());
-
-        consumeSocket.awaitClose(1, TimeUnit.SECONDS);
-        produceSocket.awaitClose(1, TimeUnit.SECONDS);
-        Assert.assertTrue(produceSocket.getBuffer().size() > 0);
-        Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(produceSocket.getBuffer().size() > 0);
+            Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
+        });
     }
 
     @Test(timeOut = 10000)