You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/10/06 12:30:16 UTC
[pulsar] branch master updated: [fix][tests] Fix flaky test V1_ProxyAuthenticationTest.anonymousSocketTest (#17934)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 24a38ea2b16 [fix][tests] Fix flaky test V1_ProxyAuthenticationTest.anonymousSocketTest (#17934)
24a38ea2b16 is described below
commit 24a38ea2b16f566fbe4b3eaec1f364b75d79b09f
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Oct 6 07:30:06 2022 -0500
[fix][tests] Fix flaky test V1_ProxyAuthenticationTest.anonymousSocketTest (#17934)
- fix thread safety issues with SimpleConsumerSocket and SimpleProducerSocket
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 2 +-
.../websocket/proxy/SimpleConsumerSocket.java | 25 ++++++++++++----------
.../websocket/proxy/SimpleProducerSocket.java | 19 ++++++++--------
.../proxy/v1/V1_ProxyAuthenticationTest.java | 10 ++++-----
4 files changed, 29 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 7abdc643462..c89c5397c87 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -968,7 +968,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
- for (JsonObject msg : consumeSocket.messages) {
+ for (JsonObject msg : consumeSocket.getMessages()) {
assertTrue(msg.has("encryptionContext"));
JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
JsonObject keys = encryptionCtx.getAsJsonObject("keys");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index b1a9908d723..8e5f20afe67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -18,23 +18,23 @@
*/
package org.apache.pulsar.websocket.proxy;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
-import com.google.gson.Gson;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,16 +43,16 @@ public class SimpleConsumerSocket {
private static final String X_PULSAR_MESSAGE_ID = "messageId";
private final CountDownLatch closeLatch;
private Session session;
- private final ArrayList<String> consumerBuffer;
- final ArrayList<JsonObject> messages;
+ private final List<String> consumerBuffer;
+ private final List<JsonObject> messages;
private final AtomicInteger receivedMessages = new AtomicInteger();
// Custom message handler to override standard message processing, if it's needed
private SimpleConsumerMessageHandler customMessageHandler;
public SimpleConsumerSocket() {
this.closeLatch = new CountDownLatch(1);
- consumerBuffer = new ArrayList<>();
- this.messages = new ArrayList<>();
+ consumerBuffer = Collections.synchronizedList(new ArrayList<>());
+ this.messages = Collections.synchronizedList(new ArrayList<>());
}
public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
@@ -125,7 +125,7 @@ public class SimpleConsumerSocket {
return this.session;
}
- public synchronized ArrayList<String> getBuffer() {
+ public List<String> getBuffer() {
return consumerBuffer;
}
@@ -135,4 +135,7 @@ public class SimpleConsumerSocket {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerSocket.class);
+ public List<JsonObject> getMessages() {
+ return messages;
+ }
}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
index 0d15e56d56c..04b24d5be2b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleProducerSocket.java
@@ -19,12 +19,16 @@
package org.apache.pulsar.websocket.proxy;
import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
@@ -35,17 +39,12 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-
@WebSocket(maxTextMessageSize = 64 * 1024)
public class SimpleProducerSocket {
private final CountDownLatch closeLatch;
private Session session;
- private final ArrayList<String> producerBuffer;
+ private final List<String> producerBuffer;
private final int messagesToSendWhenConnected;
public SimpleProducerSocket() {
@@ -54,7 +53,7 @@ public class SimpleProducerSocket {
public SimpleProducerSocket(int messagesToSendWhenConnected) {
this.closeLatch = new CountDownLatch(1);
- this.producerBuffer = new ArrayList<>();
+ this.producerBuffer = Collections.synchronizedList(new ArrayList<>());
this.messagesToSendWhenConnected = messagesToSendWhenConnected;
}
@@ -103,7 +102,7 @@ public class SimpleProducerSocket {
return this.session;
}
- public synchronized ArrayList<String> getBuffer() {
+ public List<String> getBuffer() {
return producerBuffer;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 09a7d3a90fd..4c4be4b68e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.websocket.proxy.SimpleProducerSocket;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
@@ -142,11 +143,10 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());
-
- consumeSocket.awaitClose(1, TimeUnit.SECONDS);
- produceSocket.awaitClose(1, TimeUnit.SECONDS);
- Assert.assertTrue(produceSocket.getBuffer().size() > 0);
- Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(produceSocket.getBuffer().size() > 0);
+ Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
+ });
}
@Test(timeOut = 10000)