You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/09/03 20:36:14 UTC

incubator-apex-malhar git commit: MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on reconnect.

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 a9bcce182 -> ef39a1b73


MLHR-1835 #resolve The AsyncHttpClient and WebSocket were not being closed on reconnect.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ef39a1b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ef39a1b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ef39a1b7

Branch: refs/heads/devel-3
Commit: ef39a1b73af4bb09024dc5286d891a68e95305ff
Parents: a9bcce1
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Wed Sep 2 17:11:53 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Sep 3 10:10:16 2015 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/lib/io/WebSocketInputOperator.java    | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ef39a1b7/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
index 02b9ef2..94f8d97 100644
--- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java
@@ -154,6 +154,7 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
         try {
           sleep(1000);
           if (connectionClosed && !WebSocketInputOperator.this.shutdown) {
+            connection.close();
             WebSocketInputOperator.this.activate(null);
           }
         }
@@ -184,6 +185,11 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T>
         }
 
       }));
+
+      if (client != null) {
+        client.closeAsynchronously();
+      }
+
       client = new AsyncHttpClient(config);
       connection = client.prepareGet(uri.toString()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketTextListener()
       {