You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/08 20:12:36 UTC

[2/2] incubator-apex-core git commit: APEXCORE-374 - Block with positive reference count is found during buffer server purge. When LogicalNode is teared down it's iterator must be closed to decrement reference count of the block it points to.

APEXCORE-374 - Block with positive reference count is found during buffer server purge. When LogicalNode is teared down it's iterator must be closed to decrement reference count of the block it points to.

Conflicts:
	bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9d8d9fd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9d8d9fd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9d8d9fd7

Branch: refs/heads/master
Commit: 9d8d9fd75733eb5b437f578fa3f3525e7f5c87dd
Parents: b487b3b
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Mar 7 18:30:12 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Tue Mar 8 11:11:10 2016 -0800

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 57 ++++++--------------
 .../bufferserver/internal/LogicalNode.java      | 35 +++++++-----
 .../datatorrent/bufferserver/server/Server.java |  7 +--
 3 files changed, 39 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9d8d9fd7/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 95c32b0..2a01102 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -61,7 +60,6 @@ public class DataList
   private final int blockSize;
   private final HashMap<BitVector, HashSet<DataListener>> listeners = newHashMap();
   protected final HashSet<DataListener> all_listeners = newHashSet();
-  protected final HashMap<String, DataListIterator> iterators = newHashMap();
   protected Block first;
   protected Block last;
   protected Storage storage;
@@ -332,43 +330,18 @@ public class DataList
     return block.next;
   }
 
-  public Iterator<SerializedData> newIterator(String identifier, long windowId)
+  public DataListIterator newIterator(long windowId)
   {
     //logger.debug("request for a new iterator {} and {}", identifier, windowId);
-    for (Block temp = first; temp != null; temp = temp.next) {
+    Block temp = first;
+    while (temp != last) {
       if (temp.starting_window >= windowId || temp.ending_window > windowId) {
-        DataListIterator dli = getIterator(temp);
-        iterators.put(identifier, dli);
-        //logger.debug("returning new iterator on temp = {}", temp);
-        return dli;
-      }
-    }
-
-    DataListIterator dli = getIterator(last);
-    iterators.put(identifier, dli);
-    //logger.debug("returning new iterator on last = {}", last);
-    return dli;
-  }
-
-  /**
-   * Release previous acquired iterator from this DataList
-   *
-   * @param iterator
-   * @return true if successfully released, false otherwise.
-   */
-  public boolean delIterator(Iterator<SerializedData> iterator)
-  {
-    if (iterator instanceof DataListIterator) {
-      DataListIterator dli = (DataListIterator)iterator;
-      for (Entry<String, DataListIterator> e : iterators.entrySet()) {
-        if (e.getValue() == dli) {
-          dli.close();
-          iterators.remove(e.getKey());
-          return true;
-        }
+        break;
       }
+      temp = temp.next;
     }
-    return false;
+    //logger.debug("returning new iterator on temp = {}", temp);
+    return getIterator(temp);
   }
 
   public void addDataListener(DataListener dl)
@@ -506,19 +479,21 @@ public class DataList
     int oldestBlockIndex = Integer.MAX_VALUE;
     int oldestReadOffset = Integer.MAX_VALUE;
 
-    for (Map.Entry<String, DataListIterator> entry : iterators.entrySet()) {
-      Integer index = indices.get(entry.getValue().da);
+    for (DataListener dl : all_listeners) {
+      LogicalNode logicalNode = (LogicalNode)dl;
+      DataListIterator dli = logicalNode.getIterator();
+      Integer index = indices.get(dli.da);
       if (index == null) {
         // error
         throw new RuntimeException("problemo!");
       }
       if (index < oldestBlockIndex) {
         oldestBlockIndex = index;
-        oldestReadOffset = entry.getValue().getReadOffset();
-        status.slowestConsumer = entry.getKey();
-      } else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) {
-        oldestReadOffset = entry.getValue().getReadOffset();
-        status.slowestConsumer = entry.getKey();
+        oldestReadOffset = dli.getReadOffset();
+        status.slowestConsumer = logicalNode.getIdentifier();
+      } else if (index == oldestBlockIndex && dli.getReadOffset() < oldestReadOffset) {
+        oldestReadOffset = dli.getReadOffset();
+        status.slowestConsumer = logicalNode.getIdentifier();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9d8d9fd7/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 761bbea..ab76d01 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -20,7 +20,6 @@ package com.datatorrent.bufferserver.internal;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +46,7 @@ import com.datatorrent.netlet.EventLoop;
  */
 public class LogicalNode implements DataListener
 {
+  private final String identifier;
   private final String upstream;
   private final String group;
   private final HashSet<PhysicalNode> physicalNodes;
@@ -59,25 +59,21 @@ public class LogicalNode implements DataListener
 
   /**
    *
+   * @param identifier
    * @param upstream
    * @param group
    * @param iterator
-   * @param skipUptoWindowId
+   * @param skipWindowId
    */
-  public LogicalNode(String upstream, String group, Iterator<SerializedData> iterator, long skipUptoWindowId)
+  public LogicalNode(String identifier, String upstream, String group, DataListIterator iterator, long skipWindowId)
   {
+    this.identifier = identifier;
     this.upstream = upstream;
     this.group = group;
     this.physicalNodes = new HashSet<PhysicalNode>();
     this.partitions = new HashSet<BitVector>();
-
-    if (iterator instanceof DataListIterator) {
-      this.iterator = (DataListIterator)iterator;
-    } else {
-      throw new IllegalArgumentException("iterator does not belong to DataListIterator class");
-    }
-
-    skipWindowId = skipUptoWindowId;
+    this.iterator = iterator;
+    this.skipWindowId = skipWindowId;
   }
 
   /**
@@ -91,13 +87,14 @@ public class LogicalNode implements DataListener
 
   /**
    *
-   * @return Iterator<SerializedData>
+   * @return DataListIterator
    */
-  public Iterator<SerializedData> getIterator()
+  public DataListIterator getIterator()
   {
     return iterator;
   }
 
+
   /**
    *
    * @param connection
@@ -334,6 +331,15 @@ public class LogicalNode implements DataListener
     return upstream;
   }
 
+  /**
+   *
+   * @return the identifier
+   */
+  public String getIdentifier()
+  {
+    return identifier;
+  }
+
   public void boot(EventLoop eventloop)
   {
     for (PhysicalNode pn : physicalNodes) {
@@ -345,7 +351,8 @@ public class LogicalNode implements DataListener
   @Override
   public String toString()
   {
-    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions +
+    return "LogicalNode@" + Integer.toHexString(hashCode()) +
+        "identifier=" + identifier + ", upstream=" + upstream + ", group=" + group + ", partitions=" + partitions +
         ", iterator=" + iterator + '}';
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9d8d9fd7/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 353eb2b..8a1fac7 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -271,10 +271,7 @@ public class Server implements ServerListener
       }
 
       long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId();
-      ln = new LogicalNode(upstream_identifier,
-                           type,
-                           dl.newIterator(identifier, skipWindowId),
-                           skipWindowId);
+      ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId), skipWindowId);
 
       int mask = request.getMask();
       if (mask != 0) {
@@ -584,10 +581,10 @@ public class Server implements ServerListener
           DataList dl = publisherBuffers.get(ln.getUpstream());
           if (dl != null) {
             dl.removeDataListener(ln);
-            dl.delIterator(ln.getIterator());
           }
           subscriberGroups.remove(ln.getGroup());
         }
+        ln.getIterator().close();
       }
     }