You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2018/06/16 17:57:33 UTC
[apex-core] branch master updated: APEXCORE-810 Fixing race
condition between publisher and subscriber teardowns
This is an automated email from the ASF dual-hosted git repository.
vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new 436785b APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
436785b is described below
commit 436785bd63be0e90265cf8f8f18882647b8ecab0
Author: Pramod Immaneni <pr...@apache.org>
AuthorDate: Wed Jan 17 13:48:32 2018 -0800
APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
---
.../bufferserver/internal/LogicalNode.java | 7 +--
.../datatorrent/bufferserver/server/Server.java | 62 +++++++++-------------
2 files changed, 27 insertions(+), 42 deletions(-)
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 3e8846d..af5db09 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -115,12 +115,7 @@ public class LogicalNode implements DataListener
*/
public void removeChannel(WriteOnlyClient client)
{
- for (PhysicalNode pn : physicalNodes) {
- if (pn.getClient() == client) {
- physicalNodes.remove(pn);
- break;
- }
- }
+ physicalNodes.removeIf(node -> (node.getClient().equals(client)));
}
/**
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c5700f2..6332a18 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -24,9 +24,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -117,9 +114,11 @@ public class Server extends AbstractServer
@Override
public void unregistered(SelectionKey key)
{
+ logger.debug("Unregistered {}", this);
for (LogicalNode ln : subscriberGroups.values()) {
ln.boot();
}
+ super.unregistered(key);
/*
* There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor.
*/
@@ -860,41 +859,32 @@ public class Server extends AbstractServer
}
torndown = true;
- /*
- * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
- * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
- * are not being written to, just stick around till the next publisher shows up and eat into
- * the data it's publishing for the new subscribers.
- */
-
- /**
- * since the publisher server died, the queue which it was using would stop pumping the data unless
- * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
- * with the same identifier as the one which just died.
- */
- if (publisherChannels.containsValue(this)) {
- final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();
- while (i.hasNext()) {
- if (i.next().getValue() == this) {
- i.remove();
- break;
- }
- }
- }
-
- ArrayList<LogicalNode> list = new ArrayList<>();
- String publisherIdentifier = datalist.getIdentifier();
- Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
- while (iterator.hasNext()) {
- LogicalNode ln = iterator.next();
- if (publisherIdentifier.equals(ln.getUpstream())) {
- list.add(ln);
+ serverHelperExecutor.submit(() ->
+ {
+ /*
+ * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
+ * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
+ * are not being written to, just stick around till the next publisher shows up and eat into
+ * the data it's publishing for the new subscribers.
+ */
+
+ /**
+ * since the publisher server died, the queue which it was using would stop pumping the data unless
+ * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+ * with the same identifier as the one which just died.
+ */
+ String publisherIdentifier = datalist.getIdentifier();
+ if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) {
+ logger.warn("{} could not be removed from channels", Publisher.this);
}
- }
- for (LogicalNode ln : list) {
- ln.boot();
- }
+ subscriberGroups.forEach((type, ln) -> {
+ if (publisherIdentifier.equals(ln.getUpstream())) {
+ logger.debug("Booting logical node {} from publisher", ln);
+ ln.boot();
+ }
+ });
+ });
}
}
--
To stop receiving notification emails like this one, please contact
vrozov@apache.org.