You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/03/08 00:22:05 UTC

incubator-apex-core git commit: APEXCORE-375 - Container killed because of Out of Sequence tuple error.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 72277ae7d -> 6e0a4be46


APEXCORE-375 - Container killed because of Out of Sequence tuple error.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6e0a4be4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6e0a4be4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6e0a4be4

Branch: refs/heads/release-3.2
Commit: 6e0a4be46e24a2844036d9f710acbb171bc71730
Parents: 72277ae
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Sun Mar 6 10:38:34 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sun Mar 6 16:52:45 2016 -0800

----------------------------------------------------------------------
 .../bufferserver/internal/LogicalNode.java      | 24 ++++++-----
 .../datatorrent/bufferserver/server/Server.java | 42 ++++++++++++--------
 2 files changed, 38 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e0a4be4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index f867d69..3953c3a 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -156,18 +156,20 @@ public class LogicalNode implements DataListener
    */
   public void catchUp()
   {
-    long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
-    logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
-    if (lBaseSeconds > baseSeconds) {
-      baseSeconds = lBaseSeconds;
-    }
-    logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds));
-    int intervalMillis;
-
-    int skippedPayloadTuples = 0;
-
+    caughtup = false;
     if (isReady()) {
       logger.debug("catching up {}->{}", upstream, group);
+
+      long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
+      logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
+      if (lBaseSeconds > baseSeconds) {
+        baseSeconds = lBaseSeconds;
+      }
+      logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds));
+      int intervalMillis;
+
+      int skippedPayloadTuples = 0;
+
       try {
         /*
          * fast forward to catch up with the windowId without consuming
@@ -337,8 +339,8 @@ public class LogicalNode implements DataListener
   {
     for (PhysicalNode pn : physicalNodes) {
       eventloop.disconnect(pn.getClient());
-      physicalNodes.clear();
     }
+    physicalNodes.clear();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6e0a4be4/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index a78b136..89561f3 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -219,14 +219,15 @@ public class Server implements ServerListener
    * @param connection
    * @return
    */
-  public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, AbstractLengthPrependerClient connection)
+  public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request,
+      final AbstractLengthPrependerClient connection)
   {
     String identifier = request.getIdentifier();
     String type = request.getStreamType();
     String upstream_identifier = request.getUpstreamIdentifier();
 
     // Check if there is a logical node of this type, if not create it.
-    LogicalNode ln;
+    final LogicalNode ln;
     if (subscriberGroups.containsKey(type)) {
       //logger.debug("adding to exiting group = {}", subscriberGroups.get(type));
       /*
@@ -238,15 +239,23 @@ public class Server implements ServerListener
       }
 
       ln = subscriberGroups.get(type);
-      ln.boot(eventloop);
-      ln.addConnection(connection);
+      serverHelperExecutor.submit(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          ln.boot(eventloop);
+          ln.addConnection(connection);
+          ln.catchUp();
+        }
+      });
     } else {
       /*
        * if there is already a datalist registered for the type in which this client is interested,
        * then get a iterator on the data items of that data list. If the datalist is not registered,
        * then create one and register it. Hopefully this one would be used by future upstream nodes.
        */
-      DataList dl;
+      final DataList dl;
       if (publisherBuffers.containsKey(upstream_identifier)) {
         dl = publisherBuffers.get(upstream_identifier);
         //logger.debug("old list = {}", dl);
@@ -270,8 +279,16 @@ public class Server implements ServerListener
       }
 
       subscriberGroups.put(type, ln);
-      ln.addConnection(connection);
-      dl.addDataListener(ln);
+      serverHelperExecutor.submit(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          ln.addConnection(connection);
+          ln.catchUp();
+          dl.addDataListener(ln);
+        }
+      });
     }
 
     return ln;
@@ -460,16 +477,7 @@ public class Server implements ServerListener
           key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
           subscriber.registered(key);
 
-          final LogicalNode logicalNode = handleSubscriberRequest(subscriberRequest, subscriber);
-          serverHelperExecutor.submit(new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              logicalNode.catchUp();
-            }
-
-          });
+          handleSubscriberRequest(subscriberRequest, subscriber);
           break;
 
         case PURGE_REQUEST: