You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/12 19:15:30 UTC
[pulsar] branch master updated: Modify producer handler of
WebSocket to send ack to client asynchronously (#3172)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7bda5b6 Modify producer handler of WebSocket to send ack to client asynchronously (#3172)
7bda5b6 is described below
commit 7bda5b60f8bf27af155b2a490dcf7ed926e86029
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Thu Dec 13 04:15:25 2018 +0900
Modify producer handler of WebSocket to send ack to client asynchronously (#3172)
---
.../org/apache/pulsar/websocket/ProducerHandler.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index bca9762..338c0f1 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
+import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -243,7 +244,20 @@ public class ProducerHandler extends AbstractWebSocketHandler {
private void sendAckResponse(ProducerAck response) {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
- getSession().getRemote().sendString(msg);
+ getSession().getRemote().sendString(msg, new WriteCallback() {
+ @Override
+ public void writeFailed(Throwable th) {
+ log.warn("[{}] Failed to send ack {}", producer.getTopic(), th.getMessage(), th);
+ }
+
+ @Override
+ public void writeSuccess() {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ack was sent successfully to {}", producer.getTopic(),
+ getRemote().getInetSocketAddress().toString());
+ }
+ }
+ });
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate ack json-response {}", producer.getTopic(), e.getMessage(), e);
} catch (Exception e) {