You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/08 19:51:14 UTC

incubator-apex-core git commit: APEXCORE-374 - Block with positive reference count is found during buffer server purge. When LogicalNode is teared down it's iterator must be closed to decrement reference count of the block it points to.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 6e0a4be46 -> f06544896


APEXCORE-374 - Block with positive reference count is found during buffer server purge. When LogicalNode is teared down it's iterator must be closed to decrement reference count of the block it points to.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f0654489
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f0654489
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f0654489

Branch: refs/heads/release-3.2
Commit: f065448968ddd65d6666577b6b5e254e2c4d395d
Parents: 6e0a4be
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Mar 7 18:30:12 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Mar 8 10:18:19 2016 -0800

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 57 ++++++--------------
 .../bufferserver/internal/LogicalNode.java      | 36 ++++++++-----
 .../datatorrent/bufferserver/server/Server.java |  7 +--
 3 files changed, 40 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0654489/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 06bfbf6..58bb2fe 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -61,7 +60,6 @@ public class DataList
   private final int blockSize;
   private final HashMap<BitVector, HashSet<DataListener>> listeners = newHashMap();
   protected final HashSet<DataListener> all_listeners = newHashSet();
-  protected final HashMap<String, DataListIterator> iterators = newHashMap();
   protected Block first;
   protected Block last;
   protected Storage storage;
@@ -323,43 +321,18 @@ public class DataList
     return block.next;
   }
 
-  public Iterator<SerializedData> newIterator(String identifier, long windowId)
+  public DataListIterator newIterator(long windowId)
   {
     //logger.debug("request for a new iterator {} and {}", identifier, windowId);
-    for (Block temp = first; temp != null; temp = temp.next) {
+    Block temp = first;
+    while (temp != last) {
       if (temp.starting_window >= windowId || temp.ending_window > windowId) {
-        DataListIterator dli = getIterator(temp);
-        iterators.put(identifier, dli);
-        //logger.debug("returning new iterator on temp = {}", temp);
-        return dli;
-      }
-    }
-
-    DataListIterator dli = getIterator(last);
-    iterators.put(identifier, dli);
-    //logger.debug("returning new iterator on last = {}", last);
-    return dli;
-  }
-
-  /**
-   * Release previous acquired iterator from this DataList
-   *
-   * @param iterator
-   * @return true if successfully released, false otherwise.
-   */
-  public boolean delIterator(Iterator<SerializedData> iterator)
-  {
-    if (iterator instanceof DataListIterator) {
-      DataListIterator dli = (DataListIterator)iterator;
-      for (Entry<String, DataListIterator> e : iterators.entrySet()) {
-        if (e.getValue() == dli) {
-          dli.close();
-          iterators.remove(e.getKey());
-          return true;
-        }
+        break;
       }
+      temp = temp.next;
     }
-    return false;
+    //logger.debug("returning new iterator on temp = {}", temp);
+    return getIterator(temp);
   }
 
   public void addDataListener(DataListener dl)
@@ -496,19 +469,21 @@ public class DataList
     int oldestBlockIndex = Integer.MAX_VALUE;
     int oldestReadOffset = Integer.MAX_VALUE;
 
-    for (Map.Entry<String, DataListIterator> entry : iterators.entrySet()) {
-      Integer index = indices.get(entry.getValue().da);
+    for (DataListener dl : all_listeners) {
+      LogicalNode logicalNode = (LogicalNode)dl;
+      DataListIterator dli = logicalNode.getIterator();
+      Integer index = indices.get(dli.da);
       if (index == null) {
         // error
         throw new RuntimeException("problemo!");
       }
       if (index < oldestBlockIndex) {
         oldestBlockIndex = index;
-        oldestReadOffset = entry.getValue().getReadOffset();
-        status.slowestConsumer = entry.getKey();
-      } else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) {
-        oldestReadOffset = entry.getValue().getReadOffset();
-        status.slowestConsumer = entry.getKey();
+        oldestReadOffset = dli.getReadOffset();
+        status.slowestConsumer = logicalNode.getIdentifier();
+      } else if (index == oldestBlockIndex && dli.getReadOffset() < oldestReadOffset) {
+        oldestReadOffset = dli.getReadOffset();
+        status.slowestConsumer = logicalNode.getIdentifier();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0654489/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 3953c3a..f22b767 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -20,7 +20,6 @@ package com.datatorrent.bufferserver.internal;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +46,7 @@ import com.datatorrent.netlet.EventLoop;
  */
 public class LogicalNode implements DataListener
 {
+  private final String identifier;
   private final String upstream;
   private final String group;
   private final HashSet<PhysicalNode> physicalNodes;
@@ -59,25 +59,21 @@ public class LogicalNode implements DataListener
 
   /**
    *
+   * @param identifier
    * @param upstream
    * @param group
    * @param iterator
-   * @param skipUptoWindowId
+   * @param skipWindowId
    */
-  public LogicalNode(String upstream, String group, Iterator<SerializedData> iterator, long skipUptoWindowId)
+  public LogicalNode(String identifier, String upstream, String group, DataListIterator iterator, long skipWindowId)
   {
+    this.identifier = identifier;
     this.upstream = upstream;
     this.group = group;
     this.physicalNodes = new HashSet<PhysicalNode>();
     this.partitions = new HashSet<BitVector>();
-
-    if (iterator instanceof DataListIterator) {
-      this.iterator = (DataListIterator)iterator;
-    } else {
-      throw new IllegalArgumentException("iterator does not belong to DataListIterator class");
-    }
-
-    skipWindowId = skipUptoWindowId;
+    this.iterator = iterator;
+    this.skipWindowId = skipWindowId;
   }
 
   /**
@@ -91,13 +87,14 @@ public class LogicalNode implements DataListener
 
   /**
    *
-   * @return Iterator<SerializedData>
+   * @return DataListIterator
    */
-  public Iterator<SerializedData> getIterator()
+  public DataListIterator getIterator()
   {
     return iterator;
   }
 
+
   /**
    *
    * @param connection
@@ -335,6 +332,15 @@ public class LogicalNode implements DataListener
     return upstream;
   }
 
+  /**
+   *
+   * @return the identifier
+   */
+  public String getIdentifier()
+  {
+    return identifier;
+  }
+
   public void boot(EventLoop eventloop)
   {
     for (PhysicalNode pn : physicalNodes) {
@@ -346,7 +352,9 @@ public class LogicalNode implements DataListener
   @Override
   public String toString()
   {
-    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}';
+    return "LogicalNode@" + Integer.toHexString(hashCode()) +
+        "identifier=" + identifier + ", upstream=" + upstream + ", group=" + group + ", partitions=" + partitions +
+        ", iterator=" + iterator + '}';
   }
 
   private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f0654489/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 89561f3..c39605c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -266,10 +266,7 @@ public class Server implements ServerListener
       }
 
       long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId();
-      ln = new LogicalNode(upstream_identifier,
-                           type,
-                           dl.newIterator(identifier, skipWindowId),
-                           skipWindowId);
+      ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId), skipWindowId);
 
       int mask = request.getMask();
       if (mask != 0) {
@@ -573,10 +570,10 @@ public class Server implements ServerListener
           DataList dl = publisherBuffers.get(ln.getUpstream());
           if (dl != null) {
             dl.removeDataListener(ln);
-            dl.delIterator(ln.getIterator());
           }
           subscriberGroups.remove(ln.getGroup());
         }
+        ln.getIterator().close();
       }
     }