You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/12/13 00:04:07 UTC

[2/3] apex-core git commit: APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers

APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d1646e42
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d1646e42
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d1646e42

Branch: refs/heads/master
Commit: d1646e42bdf5594ef34070594733a7ca10123a3f
Parents: d28c0dd
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Tue Dec 6 14:09:03 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Dec 6 14:09:03 2016 -0800

----------------------------------------------------------------------
 .../datatorrent/bufferserver/server/Server.java | 98 +++++++++++---------
 1 file changed, 55 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d1646e42/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index e720248..af55143 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -249,7 +249,7 @@ public class Server implements ServerListener
    * @param request
    * @param key
    */
-  public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key)
+  private void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key)
   {
     try {
       serverHelperExecutor.submit(new Runnable()
@@ -259,11 +259,11 @@ public class Server implements ServerListener
         {
           final String upstream_identifier = request.getUpstreamIdentifier();
 
-        /*
-         * if there is already a datalist registered for the type in which this client is interested,
-         * then get a iterator on the data items of that data list. If the datalist is not registered,
-         * then create one and register it. Hopefully this one would be used by future upstream nodes.
-         */
+          /*
+           * if there is already a datalist registered for the type in which this client is interested,
+           * then get a iterator on the data items of that data list. If the datalist is not registered,
+           * then create one and register it. Hopefully this one would be used by future upstream nodes.
+           */
           DataList dl = publisherBuffers.get(upstream_identifier);
           if (dl == null) {
             dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
@@ -302,7 +302,7 @@ public class Server implements ServerListener
         }
       });
     } catch (RejectedExecutionException e) {
-      logger.error("Received subscriber request {} after server {} termination. Disconnecting {}", request, this, key.channel(), e);
+      logger.error("Received subscriber request {} after server {} termination. Disconnecting {}.", request, this, key.channel(), e);
       if (key.isValid()) {
         try {
           key.channel().close();
@@ -313,6 +313,52 @@ public class Server implements ServerListener
     }
   }
 
+  private void handleSubscriberTeardown(final SelectionKey key)
+  {
+    try {
+      final Subscriber subscriber = (Subscriber)key.attachment();
+      if (subscriber != null) {
+        serverHelperExecutor.submit(new Runnable()
+        {
+          @Override
+          public void run()
+          {
+            try {
+              final LogicalNode ln = subscriber.ln;
+              if (ln != null) {
+                ln.removeChannel(subscriber);
+                if (ln.getPhysicalNodeCount() == 0) {
+                  DataList dl = publisherBuffers.get(ln.getUpstream());
+                  if (dl != null) {
+                    logger.info("Removing ln {} from dl {}", ln, dl);
+                    dl.removeDataListener(ln);
+                  }
+                  subscriberGroups.remove(ln.getGroup(), ln);
+                  ln.getIterator().close();
+                }
+                subscriber.ln = null;
+              }
+            } catch (Throwable t) {
+              logger.error("Buffer server {} failed to tear down subscriber {}.", Server.this, subscriber, t);
+            }
+          }
+
+          @Override
+          public String toString()
+          {
+            return subscriber + " teardown task.";
+          }
+        });
+      } else {
+        logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment());
+      }
+    } catch (ClassCastException e) {
+      logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment());
+    } catch (RejectedExecutionException e) {
+      logger.error("Subscriber {} teardown after server {} termination.", key.attachment(), this, e);
+    }
+  }
+
   /**
    *
    * @param request
@@ -521,48 +567,14 @@ public class Server implements ServerListener
     @Override
     public void unregistered(final SelectionKey key)
     {
-      try {
-        serverHelperExecutor.submit(new Runnable()
-        {
-          @Override
-          public void run()
-          {
-            teardown();
-          }
-
-          @Override
-          public String toString()
-          {
-            return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) +
-                " teardown " + Subscriber.this;
-          }
-        });
-      } catch (Exception e) {
-        logger.error("{}", this, e);
-      }
+      handleSubscriberTeardown(key);
       super.unregistered(key);
     }
 
-    private void teardown()
-    {
-      if (ln != null) {
-        ln.removeChannel(Subscriber.this);
-        if (ln.getPhysicalNodeCount() == 0) {
-          DataList dl = publisherBuffers.get(ln.getUpstream());
-          if (dl != null) {
-            dl.removeDataListener(ln);
-          }
-          subscriberGroups.remove(ln.getGroup(), ln);
-          ln.getIterator().close();
-          ln = null;
-        }
-      }
-    }
-
     @Override
     public String toString()
     {
-      return "Server.Subscriber{" + "ln=" + ln + "}";
+      return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{ln=" + ln + "}";
     }
   }