You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/14 21:56:09 UTC

[pulsar] 02/03: Fix the unit tests for the websocket and run tests under websocket group (#10921)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa5c5d4d62d31123c03290953af827ec4c94b79e
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 15 05:49:52 2021 +0800

    Fix the unit tests for the websocket and run tests under websocket group (#10921)
---
 build/run_unit_group.sh                              |  2 +-
 .../pulsar/websocket/proxy/SimpleConsumerSocket.java | 20 ++++++++++++--------
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 9c24774..00b3fb0 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -42,7 +42,7 @@ function broker_group_1() {
 }
 
 function broker_group_2() {
-  $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,other'
+  $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other'
 }
 
 function broker_client_api() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index f6917cc..749bfdcd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -79,15 +79,19 @@ public class SimpleConsumerSocket {
     public synchronized void onMessage(String msg) throws JsonParseException, IOException {
         receivedMessages.incrementAndGet();
         JsonObject message = new Gson().fromJson(msg, JsonObject.class);
-        String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
-        consumerBuffer.add(messageId);
-        if (customMessageHandler != null) {
-            this.getRemote().sendString(customMessageHandler.handle(messageId, message));
+        if (message.get(X_PULSAR_MESSAGE_ID) != null) {
+            String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
+            consumerBuffer.add(messageId);
+            if (customMessageHandler != null) {
+                this.getRemote().sendString(customMessageHandler.handle(messageId, message));
+            } else {
+                JsonObject ack = new JsonObject();
+                ack.add("messageId", new JsonPrimitive(messageId));
+                // Acking the proxy
+                this.getRemote().sendString(ack.toString());
+            }
         } else {
-            JsonObject ack = new JsonObject();
-            ack.add("messageId", new JsonPrimitive(messageId));
-            // Acking the proxy
-            this.getRemote().sendString(ack.toString());
+            consumerBuffer.add(message.toString());
         }
     }