You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/12/13 00:04:07 UTC
[2/3] apex-core git commit: APEXCORE-583 - Buffer Server LogicalNode
should not be reused by Subscribers
APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d1646e42
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d1646e42
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d1646e42
Branch: refs/heads/master
Commit: d1646e42bdf5594ef34070594733a7ca10123a3f
Parents: d28c0dd
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Tue Dec 6 14:09:03 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Dec 6 14:09:03 2016 -0800
----------------------------------------------------------------------
.../datatorrent/bufferserver/server/Server.java | 98 +++++++++++---------
1 file changed, 55 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d1646e42/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index e720248..af55143 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -249,7 +249,7 @@ public class Server implements ServerListener
* @param request
* @param key
*/
- public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key)
+ private void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key)
{
try {
serverHelperExecutor.submit(new Runnable()
@@ -259,11 +259,11 @@ public class Server implements ServerListener
{
final String upstream_identifier = request.getUpstreamIdentifier();
- /*
- * if there is already a datalist registered for the type in which this client is interested,
- * then get a iterator on the data items of that data list. If the datalist is not registered,
- * then create one and register it. Hopefully this one would be used by future upstream nodes.
- */
+ /*
+ * if there is already a datalist registered for the type in which this client is interested,
+ * then get a iterator on the data items of that data list. If the datalist is not registered,
+ * then create one and register it. Hopefully this one would be used by future upstream nodes.
+ */
DataList dl = publisherBuffers.get(upstream_identifier);
if (dl == null) {
dl = Tuple.FAST_VERSION.equals(request.getVersion()) ?
@@ -302,7 +302,7 @@ public class Server implements ServerListener
}
});
} catch (RejectedExecutionException e) {
- logger.error("Received subscriber request {} after server {} termination. Disconnecting {}", request, this, key.channel(), e);
+ logger.error("Received subscriber request {} after server {} termination. Disconnecting {}.", request, this, key.channel(), e);
if (key.isValid()) {
try {
key.channel().close();
@@ -313,6 +313,52 @@ public class Server implements ServerListener
}
}
+ private void handleSubscriberTeardown(final SelectionKey key)
+ {
+ try {
+ final Subscriber subscriber = (Subscriber)key.attachment();
+ if (subscriber != null) {
+ serverHelperExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ final LogicalNode ln = subscriber.ln;
+ if (ln != null) {
+ ln.removeChannel(subscriber);
+ if (ln.getPhysicalNodeCount() == 0) {
+ DataList dl = publisherBuffers.get(ln.getUpstream());
+ if (dl != null) {
+ logger.info("Removing ln {} from dl {}", ln, dl);
+ dl.removeDataListener(ln);
+ }
+ subscriberGroups.remove(ln.getGroup(), ln);
+ ln.getIterator().close();
+ }
+ subscriber.ln = null;
+ }
+ } catch (Throwable t) {
+ logger.error("Buffer server {} failed to tear down subscriber {}.", Server.this, subscriber, t);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return subscriber + " teardown task.";
+ }
+ });
+ } else {
+ logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment());
+ }
+ } catch (ClassCastException e) {
+ logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment());
+ } catch (RejectedExecutionException e) {
+ logger.error("Subscriber {} teardown after server {} termination.", key.attachment(), this, e);
+ }
+ }
+
/**
*
* @param request
@@ -521,48 +567,14 @@ public class Server implements ServerListener
@Override
public void unregistered(final SelectionKey key)
{
- try {
- serverHelperExecutor.submit(new Runnable()
- {
- @Override
- public void run()
- {
- teardown();
- }
-
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) +
- " teardown " + Subscriber.this;
- }
- });
- } catch (Exception e) {
- logger.error("{}", this, e);
- }
+ handleSubscriberTeardown(key);
super.unregistered(key);
}
- private void teardown()
- {
- if (ln != null) {
- ln.removeChannel(Subscriber.this);
- if (ln.getPhysicalNodeCount() == 0) {
- DataList dl = publisherBuffers.get(ln.getUpstream());
- if (dl != null) {
- dl.removeDataListener(ln);
- }
- subscriberGroups.remove(ln.getGroup(), ln);
- ln.getIterator().close();
- ln = null;
- }
- }
- }
-
@Override
public String toString()
{
- return "Server.Subscriber{" + "ln=" + ln + "}";
+ return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{ln=" + ln + "}";
}
}