You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/09/03 20:36:14 UTC
incubator-apex-malhar git commit: MLHR-1835 #resolve The
AsyncHttpClient and WebSocket were not being closed on reconnect.
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 a9bcce182 -> ef39a1b73
MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on reconnect.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ef39a1b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ef39a1b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ef39a1b7
Branch: refs/heads/devel-3
Commit: ef39a1b73af4bb09024dc5286d891a68e95305ff
Parents: a9bcce1
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Sep 2 17:11:53 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Sep 3 10:10:16 2015 -0700
----------------------------------------------------------------------
.../java/com/datatorrent/lib/io/WebSocketInputOperator.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ef39a1b7/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index 02b9ef2..94f8d97 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -154,6 +154,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
try {
sleep(1000);
if (connectionClosed && !WebSocketInputOperator.this.shutdown) {
+ connection.close();
WebSocketInputOperator.this.activate(null);
}
}
@@ -184,6 +185,11 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
}
}));
+
+ if (client != null) {
+ client.closeAsynchronously();
+ }
+
client = new AsyncHttpClient(config);
connection = client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener()
{