You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/03/08 00:22:05 UTC
incubator-apex-core git commit: APEXCORE-375 - Container killed
because of Out of Sequence tuple error.
Repository: incubator-apex-core
Updated Branches:
refs/heads/release-3.2 72277ae7d -> 6e0a4be46
APEXCORE-375 - Container killed because of Out of Sequence tuple error.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6e0a4be4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6e0a4be4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6e0a4be4
Branch: refs/heads/release-3.2
Commit: 6e0a4be46e24a2844036d9f710acbb171bc71730
Parents: 72277ae
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Mar 6 10:38:34 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sun Mar 6 16:52:45 2016 -0800
----------------------------------------------------------------------
.../bufferserver/internal/LogicalNode.java | 24 ++++++-----
.../datatorrent/bufferserver/server/Server.java | 42 ++++++++++++--------
2 files changed, 38 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e0a4be4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index f867d69..3953c3a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -156,18 +156,20 @@ public class LogicalNode implements DataListener
*/
public void catchUp()
{
- long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
- logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
- if (lBaseSeconds > baseSeconds) {
- baseSeconds = lBaseSeconds;
- }
- logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds));
- int intervalMillis;
-
- int skippedPayloadTuples = 0;
-
+ caughtup = false;
if (isReady()) {
logger.debug("catching up {}->{}", upstream, group);
+
+ long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
+ logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
+ if (lBaseSeconds > baseSeconds) {
+ baseSeconds = lBaseSeconds;
+ }
+ logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds));
+ int intervalMillis;
+
+ int skippedPayloadTuples = 0;
+
try {
/*
* fast forward to catch up with the windowId without consuming
@@ -337,8 +339,8 @@ public class LogicalNode implements DataListener
{
for (PhysicalNode pn : physicalNodes) {
eventloop.disconnect(pn.getClient());
- physicalNodes.clear();
}
+ physicalNodes.clear();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e0a4be4/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index a78b136..89561f3 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -219,14 +219,15 @@ public class Server implements ServerListener
* @param connection
* @return
*/
- public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, AbstractLengthPrependerClient connection)
+ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request,
+ final AbstractLengthPrependerClient connection)
{
String identifier = request.getIdentifier();
String type = request.getStreamType();
String upstream_identifier = request.getUpstreamIdentifier();
// Check if there is a logical node of this type, if not create it.
- LogicalNode ln;
+ final LogicalNode ln;
if (subscriberGroups.containsKey(type)) {
//logger.debug("adding to exiting group = {}", subscriberGroups.get(type));
/*
@@ -238,15 +239,23 @@ public class Server implements ServerListener
}
ln = subscriberGroups.get(type);
- ln.boot(eventloop);
- ln.addConnection(connection);
+ serverHelperExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ ln.boot(eventloop);
+ ln.addConnection(connection);
+ ln.catchUp();
+ }
+ });
} else {
/*
* if there is already a datalist registered for the type in which this client is interested,
* then get a iterator on the data items of that data list. If the datalist is not registered,
* then create one and register it. Hopefully this one would be used by future upstream nodes.
*/
- DataList dl;
+ final DataList dl;
if (publisherBuffers.containsKey(upstream_identifier)) {
dl = publisherBuffers.get(upstream_identifier);
//logger.debug("old list = {}", dl);
@@ -270,8 +279,16 @@ public class Server implements ServerListener
}
subscriberGroups.put(type, ln);
- ln.addConnection(connection);
- dl.addDataListener(ln);
+ serverHelperExecutor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ ln.addConnection(connection);
+ ln.catchUp();
+ dl.addDataListener(ln);
+ }
+ });
}
return ln;
@@ -460,16 +477,7 @@ public class Server implements ServerListener
key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
subscriber.registered(key);
- final LogicalNode logicalNode = handleSubscriberRequest(subscriberRequest, subscriber);
- serverHelperExecutor.submit(new Runnable()
- {
- @Override
- public void run()
- {
- logicalNode.catchUp();
- }
-
- });
+ handleSubscriberRequest(subscriberRequest, subscriber);
break;
case PURGE_REQUEST: