You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/14 21:56:09 UTC
[pulsar] 02/03: Fix the unit tests for the websocket and run tests
under websocket group (#10921)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fa5c5d4d62d31123c03290953af827ec4c94b79e
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 15 05:49:52 2021 +0800
Fix the unit tests for the websocket and run tests under websocket group (#10921)
---
build/run_unit_group.sh | 2 +-
.../pulsar/websocket/proxy/SimpleConsumerSocket.java | 20 ++++++++++++--------
2 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 9c24774..00b3fb0 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -42,7 +42,7 @@ function broker_group_1() {
}
function broker_group_2() {
- $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,other'
+ $MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='schema,utils,functions-worker,broker-io,broker-discovery,broker-compaction,broker-naming,websocket,other'
}
function broker_client_api() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index f6917cc..749bfdcd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -79,15 +79,19 @@ public class SimpleConsumerSocket {
public synchronized void onMessage(String msg) throws JsonParseException, IOException {
receivedMessages.incrementAndGet();
JsonObject message = new Gson().fromJson(msg, JsonObject.class);
- String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
- consumerBuffer.add(messageId);
- if (customMessageHandler != null) {
- this.getRemote().sendString(customMessageHandler.handle(messageId, message));
+ if (message.get(X_PULSAR_MESSAGE_ID) != null) {
+ String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
+ consumerBuffer.add(messageId);
+ if (customMessageHandler != null) {
+ this.getRemote().sendString(customMessageHandler.handle(messageId, message));
+ } else {
+ JsonObject ack = new JsonObject();
+ ack.add("messageId", new JsonPrimitive(messageId));
+ // Acking the proxy
+ this.getRemote().sendString(ack.toString());
+ }
} else {
- JsonObject ack = new JsonObject();
- ack.add("messageId", new JsonPrimitive(messageId));
- // Acking the proxy
- this.getRemote().sendString(ack.toString());
+ consumerBuffer.add(message.toString());
}
}