You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:35 UTC

[34/50] [abbrv] incubator-apex-core git commit: APEX-184 #resolve 1. In autoFlushExecutor don't exit run() until there is at least one listener that has more data to send. 2. Do not enable read in resumeReadIfSuspended when not able to switch to a new bu

APEX-184 #resolve
1. In autoFlushExecutor don't exit run() until there is at least one listener that has more data to send.
2. Do not enable read in resumeReadIfSuspended when not able to switch to a new buffer.
3. Fix possible race condition in Block acquire.
4. Fix for incorrect counting of in memory block permits.
5. Fix check style violations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4c6d3f5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4c6d3f5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4c6d3f5b

Branch: refs/heads/master
Commit: 4c6d3f5b34a163c74c8ff80763a57a62a7a297c2
Parents: 979a0ef
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Oct 9 17:35:30 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Oct 20 19:43:02 2015 -0700

----------------------------------------------------------------------
 bufferserver/pom.xml                            |   2 +-
 .../bufferserver/internal/DataList.java         | 248 ++++++++++++-------
 .../bufferserver/internal/DataListener.java     |   2 +-
 .../bufferserver/internal/FastDataList.java     |  11 +-
 .../bufferserver/internal/LogicalNode.java      |  37 ++-
 .../datatorrent/bufferserver/server/Server.java | 108 ++++----
 6 files changed, 232 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4c6d3f5b/bufferserver/pom.xml
----------------------------------------------------------------------
diff --git a/bufferserver/pom.xml b/bufferserver/pom.xml
index 1346ba7..6b9a848 100644
--- a/bufferserver/pom.xml
+++ b/bufferserver/pom.xml
@@ -51,7 +51,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>124</maxAllowedViolations>
+          <maxAllowedViolations>60</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4c6d3f5b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index f5af2e5..1f6c273 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -19,8 +19,13 @@
 package com.datatorrent.bufferserver.internal;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,7 +45,6 @@ import com.datatorrent.bufferserver.util.VarInt;
 import com.datatorrent.netlet.AbstractClient;
 import com.datatorrent.netlet.util.VarInt.MutableInt;
 
-import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Maps.newHashMap;
 import static com.google.common.collect.Sets.newHashSet;
 
@@ -66,9 +70,10 @@ public class DataList
   protected int size;
   protected int processingOffset;
   protected long baseSeconds;
-  private final List<AbstractClient> suspendedClients = newArrayList();
+  private final Set<AbstractClient> suspendedClients = newHashSet();
   private final AtomicInteger numberOfInMemBlockPermits;
   private MutableInt nextOffset = new MutableInt();
+  private Future<?> future;
 
   public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks)
   {
@@ -106,13 +111,20 @@ public class DataList
       for (Block temp = first; temp != null; temp = temp.next) {
         if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) {
           if (temp != last) {
+            last.refCount.decrementAndGet();
             last = temp;
             do {
               temp = temp.next;
               temp.discard(false);
-              if (temp.data != null) {
-                temp.data = null;
-                numberOfInMemBlockRewound++;
+              synchronized (temp) {
+                if (temp.refCount.get() != 0) {
+                  logger.debug("Discarded block {} has positive reference count. Listeners: {}", temp, all_listeners);
+                  throw new IllegalStateException("Discarded block " + temp + " has positive reference count!");
+                }
+                if (temp.data != null) {
+                  temp.data = null;
+                  numberOfInMemBlockRewound++;
+                }
               }
             } while (temp.next != null);
             last.next = null;
@@ -148,8 +160,13 @@ public class DataList
         Block temp = first;
         while (temp != last) {
           temp.discard(false);
-          temp.data = null;
-          temp = temp.next;
+          synchronized (temp) {
+            if (temp.refCount.get() != 0) {
+              throw new IllegalStateException("Discarded block " + temp + " not zero reference count!");
+            }
+            temp.data = null;
+            temp = temp.next;
+          }
         }
       }
       first = last;
@@ -173,9 +190,15 @@ public class DataList
           break;
         }
         temp.discard(false);
-        if (temp.data != null) {
-          temp.data = null;
-          numberOfInMemBlockPurged++;
+        synchronized (temp) {
+          if (temp.refCount.get() != 0) {
+            logger.debug("Discarded block {} has positive reference count. Listeners: {}", temp, all_listeners);
+            throw new IllegalStateException("Discarded block " + temp + " has positive reference count!");
+          }
+          if (temp.data != null) {
+            temp.data = null;
+            numberOfInMemBlockPurged++;
+          }
         }
       }
     }
@@ -202,21 +225,15 @@ public class DataList
     do {
       while (size == 0) {
         size = VarInt.read(last.data, processingOffset, writeOffset, nextOffset);
-        switch (nextOffset.integer) {
-          case -5:
-            throw new RuntimeException("problemo!");
-
-          case -4:
-          case -3:
-          case -2:
-          case -1:
-          case 0:
-            if (writeOffset == last.data.length) {
-              nextOffset.integer = 0;
-              processingOffset = 0;
-              size = 0;
-            }
-            break flush;
+        if (nextOffset.integer > -5 && nextOffset.integer < 1) {
+          if (writeOffset == last.data.length) {
+            nextOffset.integer = 0;
+            processingOffset = 0;
+            size = 0;
+          }
+          break flush;
+        } else if (nextOffset.integer == -5) {
+          throw new RuntimeException("problemo!");
         }
       }
 
@@ -240,6 +257,9 @@ public class DataList
             Tuple rwt = Tuple.getTuple(last.data, processingOffset, size);
             baseSeconds = (long)rwt.getBaseSeconds() << 32;
             break;
+
+          default:
+            break;
         }
         processingOffset += size;
         size = 0;
@@ -255,17 +275,28 @@ public class DataList
 
     last.writingOffset = writeOffset;
 
-    autoFlushExecutor.submit(new Runnable()
-    {
-      @Override
-      public void run()
+    notifyListeners();
+
+  }
+
+  public void notifyListeners()
+  {
+    if (future == null || future.isDone() || future.isCancelled()) {
+      future = autoFlushExecutor.submit(new Runnable()
       {
-        for (DataListener dl : all_listeners) {
-          dl.addedData();
+        @Override
+        public void run()
+        {
+          boolean atLeastOneListenerHasDataToSend;
+          do {
+            atLeastOneListenerHasDataToSend = false;
+            for (DataListener dl : all_listeners) {
+              atLeastOneListenerHasDataToSend |= dl.addedData();
+            }
+          } while (atLeastOneListenerHasDataToSend);
         }
-      }
-
-    });
+      });
+    }
   }
 
   public void setAutoFlushExecutor(final ExecutorService es)
@@ -381,7 +412,7 @@ public class DataList
   public boolean suspendRead(final AbstractClient client)
   {
     synchronized (suspendedClients) {
-      return client.suspendReadIfResumed() && suspendedClients.add(client);
+      return suspendedClients.add(client) && client.suspendReadIfResumed();
     }
   }
 
@@ -395,6 +426,8 @@ public class DataList
         }
         suspendedClients.clear();
       }
+    } else {
+      logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners);
     }
     return resumedSuspendedClients;
   }
@@ -409,7 +442,7 @@ public class DataList
     return new byte[blockSize];
   }
 
-  public void addBuffer(byte[] array)
+  public synchronized void addBuffer(byte[] array)
   {
     final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.decrementAndGet();
     if (numberOfInMemBlockPermits < 0) {
@@ -468,8 +501,7 @@ public class DataList
         oldestBlockIndex = index;
         oldestReadOffset = entry.getValue().getReadOffset();
         status.slowestConsumer = entry.getKey();
-      }
-      else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) {
+      } else if (index == oldestBlockIndex && entry.getValue().getReadOffset() < oldestReadOffset) {
         oldestReadOffset = entry.getValue().getReadOffset();
         status.slowestConsumer = entry.getKey();
       }
@@ -481,8 +513,7 @@ public class DataList
       status.numBytesAllocated += b.data.length;
       if (oldestBlockIndex == i) {
         status.numBytesWaiting += b.writingOffset - oldestReadOffset;
-      }
-      else if (oldestBlockIndex < i) {
+      } else if (oldestBlockIndex < i) {
         status.numBytesWaiting += b.writingOffset - b.readingOffset;
       }
       b = b.next;
@@ -508,7 +539,7 @@ public class DataList
     /**
      * actual data - stored as length followed by actual data.
      */
-    byte data[];
+    byte[] data;
     /**
      * readingOffset is the offset of the first valid byte in the array.
      */
@@ -536,8 +567,8 @@ public class DataList
     /**
      * how count of references to this block.
      */
-    AtomicInteger refCount;
-    Future future;
+    private final AtomicInteger refCount;
+    private Future<?> future;
 
     public Block(String id, int size)
     {
@@ -566,8 +597,7 @@ public class DataList
         if (current.offset + current.length > writingOffset) {
           current.length = 0;
         }
-      }
-      else {
+      } else {
         current.length = 0;
       }
     }
@@ -581,7 +611,7 @@ public class DataList
           SerializedData sd = dli.next();
           switch (sd.buffer[sd.dataOffset]) {
             case MessageType.RESET_WINDOW_VALUE:
-              ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
               bs = (long)rwt.getBaseSeconds() << 32;
               if (bs > windowId) {
                 writingOffset = sd.offset;
@@ -590,12 +620,15 @@ public class DataList
               break;
 
             case MessageType.BEGIN_WINDOW_VALUE:
-              BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
               if ((bs | bwt.getWindowId()) >= windowId) {
                 writingOffset = sd.offset;
                 break done;
               }
               break;
+
+            default:
+              break;
           }
         }
       }
@@ -628,19 +661,19 @@ public class DataList
           SerializedData sd = dli.next();
           switch (sd.buffer[sd.dataOffset]) {
             case MessageType.RESET_WINDOW_VALUE:
-              ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
               bs = (long)rwt.getBaseSeconds() << 32;
               lastReset = sd;
               break;
 
             case MessageType.BEGIN_WINDOW_VALUE:
-              BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
               if ((bs | bwt.getWindowId()) > longWindowId) {
                 found = true;
                 if (lastReset != null) {
-                /*
-                 * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple.
-                 */
+                  /*
+                   * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple.
+                   */
                   if (sd.offset >= lastReset.length) {
                     sd.offset -= lastReset.length;
                     if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) {
@@ -655,6 +688,10 @@ public class DataList
 
                 break done;
               }
+              break;
+
+            default:
+              break;
           }
         }
       }
@@ -671,8 +708,7 @@ public class DataList
           System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length);
           this.starting_window = this.ending_window = bs;
           //logger.debug("=20140220= reassign the windowids {}", this);
-        }
-        else {
+        } else {
           this.readingOffset = this.writingOffset;
           this.starting_window = this.ending_window = longWindowId;
           //logger.debug("=20140220= avoid the windowids {}", this);
@@ -692,8 +728,7 @@ public class DataList
           sd.offset = 0;
           sd.dataOffset = VarInt.write(sd.length - i, sd.buffer, sd.offset, i);
           sd.buffer[sd.dataOffset] = MessageType.NO_MESSAGE_VALUE;
-        }
-        else {
+        } else {
           logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset);
         }
 
@@ -710,15 +745,17 @@ public class DataList
         {
           byte[] data = storage.retrieve(identifier, uniqueIdentifier);
           synchronized (Block.this) {
-            Block.this.data = data;
-            readingOffset = 0;
-            writingOffset = data.length;
-            if (refCount.get() > 1) {
+            if (Block.this.data == null) {
+              Block.this.data = data;
+              readingOffset = 0;
+              writingOffset = data.length;
               Block.this.notifyAll();
-            }
-            int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet();
-            if (numberOfInMemBlockPermits < 0) {
-              logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
+              int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet();
+              if (numberOfInMemBlockPermits < 0) {
+                logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
+              }
+            } else {
+              logger.debug("Block {} was already loaded into memory", Block.this);
             }
           }
         }
@@ -727,20 +764,34 @@ public class DataList
 
     protected void acquire(boolean wait)
     {
-      if (refCount.getAndIncrement() == 0 && storage != null && data == null) {
+      int refCount = this.refCount.getAndIncrement();
+      synchronized (Block.this) {
+        if (data != null) {
+          return;
+        }
+      }
+      if (refCount == 0 && storage != null) {
         final Runnable retriever = getRetriever();
+        if (future != null && future.cancel(false)) {
+          logger.debug("Block {} future is cancelled", this);
+        }
         if (wait) {
+          future = null;
           retriever.run();
         } else {
           future = storageExecutor.submit(retriever);
         }
-      } else if (wait && data == null) {
+      } else if (wait) {
         try {
           synchronized (Block.this) {
-            wait();
+            if (future == null) {
+              throw new IllegalStateException("No task is scheduled to retrieve block " + Block.this);
+            }
+            while (data == null) {
+              wait();
+            }
           }
-        }
-        catch (InterruptedException ex) {
+        } catch (InterruptedException ex) {
           throw new RuntimeException("Interrupted while waiting for data to be loaded!", ex);
         }
       }
@@ -758,15 +809,16 @@ public class DataList
           }
           if (uniqueIdentifier == 0) {
             logger.warn("Storage returned unexpectedly, please check the status of the spool directory!");
-          }
-          else {
-            //logger.debug("Spooled {} to disk", Block.this);
+          } else {
+            int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get();
             synchronized (Block.this) {
-              if (refCount.get() == 0) {
+              if (refCount.get() == 0 && Block.this.data != null) {
                 Block.this.data = null;
+                numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet();
+              } else {
+                logger.debug("Keeping Block {} unchanged", Block.this);
               }
             }
-            int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet();
             assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
             resumeSuspendedClients(numberOfInMemBlockPermits);
           }
@@ -780,10 +832,17 @@ public class DataList
       if (refCount == 0 && storage != null) {
         assert (next != null);
         final Runnable storer = getStorer(data, readingOffset, writingOffset, storage);
-        if (wait && numberOfInMemBlockPermits.get() == 0) {
+        if (future != null && future.cancel(false)) {
+          logger.debug("Block {} future is cancelled", this);
+        }
+        final int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get();
+        if (wait && numberOfInMemBlockPermits == 0) {
+          future = null;
           storer.run();
-        } else if (numberOfInMemBlockPermits.get() < MAX_COUNT_OF_INMEM_BLOCKS/2) {
+        } else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) {
           future = storageExecutor.submit(storer);
+        } else {
+          future = null;
         }
       } else {
         logger.debug("Holding {} in memory due to {} references.", this, refCount);
@@ -809,11 +868,12 @@ public class DataList
     protected void discard(final boolean wait)
     {
       if (storage != null) {
-        if (future != null) {
-          future.cancel(false);
-        }
         final Runnable discarder = getDiscarder();
+        if (future != null && future.cancel(false)) {
+          logger.debug("Block {} future is cancelled", this);
+        }
         if (wait) {
+          future = null;
           discarder.run();
         } else {
           future = storageExecutor.submit(discarder);
@@ -828,7 +888,7 @@ public class DataList
              + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset
              + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window)
              + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
-             + '}';
+             + ", future=" + (future == null ? "null" : future.isDone() ? "Done" : future.isCancelled() ? "Cancelled" : future) + '}';
     }
 
   }
@@ -895,19 +955,13 @@ public class DataList
     {
       while (size == 0) {
         size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset);
-        switch (nextOffset.integer) {
-          case -5:
-            throw new RuntimeException("problemo!");
-
-          case -4:
-          case -3:
-          case -2:
-          case -1:
-          case 0:
-            if (da.writingOffset == buffer.length && switchToNextBlock()) {
-              continue;
-            }
-            return false;
+        if (nextOffset.integer > -5 && nextOffset.integer < 1) {
+          if (da.writingOffset == buffer.length && switchToNextBlock()) {
+            continue;
+          }
+          return false;
+        } else if (size == -5) {
+          throw new RuntimeException("problemo!");
         }
       }
 
@@ -965,6 +1019,12 @@ public class DataList
       size = 0;
     }
 
+    @Override
+    public String toString()
+    {
+      return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{da=" + da + '}';
+    }
+
   }
 
   private static final Logger logger = LoggerFactory.getLogger(DataList.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4c6d3f5b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
index fd9cebc..4add008 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java
@@ -36,7 +36,7 @@ public interface DataListener
 
   /**
    */
-  public void addedData();
+  public boolean addedData();
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4c6d3f5b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index 939d0c1..6ba7b64 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -99,17 +99,8 @@ public class FastDataList extends DataList
 
     last.writingOffset = writeOffset;
 
-    autoFlushExecutor.submit(new Runnable()
-    {
-      @Override
-      public void run()
-      {
-        for (DataListener dl : all_listeners) {
-          dl.addedData();
-        }
-      }
+    notifyListeners();
 
-    });
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4c6d3f5b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 40a8207..f867d69 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -73,8 +73,7 @@ public class LogicalNode implements DataListener
 
     if (iterator instanceof DataListIterator) {
       this.iterator = (DataListIterator)iterator;
-    }
-    else {
+    } else {
       throw new IllegalArgumentException("iterator does not belong to DataListIterator class");
     }
 
@@ -195,12 +194,12 @@ public class LogicalNode implements DataListener
             case MessageType.BEGIN_WINDOW_VALUE:
               tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
               logger.debug("{}->{} condition {} =? {}",
-                           new Object[] {
-                upstream,
-                group,
-                Codec.getStringWindowId(baseSeconds | tuple.getWindowId()),
-                Codec.getStringWindowId(skipWindowId)
-              });
+                  new Object[] {
+                      upstream,
+                      group,
+                      Codec.getStringWindowId(baseSeconds | tuple.getWindowId()),
+                      Codec.getStringWindowId(skipWindowId)
+                  });
               if ((baseSeconds | tuple.getWindowId()) > skipWindowId) {
                 logger.debug("caught up {}->{} skipping {} payload tuples", upstream, group, skippedPayloadTuples);
                 ready = GiveAll.getInstance().distribute(physicalNodes, data);
@@ -219,8 +218,7 @@ public class LogicalNode implements DataListener
               logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes);
           }
         }
-      }
-      catch (InterruptedException ie) {
+      } catch (InterruptedException ie) {
         throw new RuntimeException(ie);
       }
 
@@ -232,9 +230,8 @@ public class LogicalNode implements DataListener
     logger.debug("Exiting catch up because caughtup = {}", caughtup);
   }
 
-  @SuppressWarnings("fallthrough")
   @Override
-  public void addedData()
+  public boolean addedData()
   {
     if (isReady()) {
       if (caughtup) {
@@ -257,6 +254,8 @@ public class LogicalNode implements DataListener
                 case MessageType.RESET_WINDOW_VALUE:
                   Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
                   baseSeconds = (long)resetWindow.getBaseSeconds() << 32;
+                  ready = GiveAll.getInstance().distribute(physicalNodes, data);
+                  break;
 
                 default:
                   //logger.debug("sending data of type {}", MessageType.valueOf(data.buffer[data.dataOffset]));
@@ -264,8 +263,7 @@ public class LogicalNode implements DataListener
                   break;
               }
             }
-          }
-          else {
+          } else {
             while (ready && iterator.hasNext()) {
               SerializedData data = iterator.next();
               switch (data.buffer[data.dataOffset]) {
@@ -287,6 +285,8 @@ public class LogicalNode implements DataListener
                 case MessageType.RESET_WINDOW_VALUE:
                   tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset);
                   baseSeconds = (long)tuple.getBaseSeconds() << 32;
+                  ready = GiveAll.getInstance().distribute(physicalNodes, data);
+                  break;
 
                 default:
                   ready = GiveAll.getInstance().distribute(physicalNodes, data);
@@ -294,15 +294,14 @@ public class LogicalNode implements DataListener
               }
             }
           }
-        }
-        catch (InterruptedException ie) {
+        } catch (InterruptedException ie) {
           throw new RuntimeException(ie);
         }
-      }
-      else {
+      } else {
         catchUp();
       }
     }
+    return !ready;
   }
 
   /**
@@ -345,7 +344,7 @@ public class LogicalNode implements DataListener
   @Override
   public String toString()
   {
-    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + '}';
+    return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}';
   }
 
   private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4c6d3f5b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 9f31e02..cd45738 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -29,7 +29,13 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +43,12 @@ import org.slf4j.LoggerFactory;
 import com.datatorrent.bufferserver.internal.DataList;
 import com.datatorrent.bufferserver.internal.FastDataList;
 import com.datatorrent.bufferserver.internal.LogicalNode;
-import com.datatorrent.bufferserver.packet.*;
+import com.datatorrent.bufferserver.packet.PayloadTuple;
+import com.datatorrent.bufferserver.packet.PublishRequestTuple;
+import com.datatorrent.bufferserver.packet.PurgeRequestTuple;
+import com.datatorrent.bufferserver.packet.ResetRequestTuple;
+import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
+import com.datatorrent.bufferserver.packet.Tuple;
 import com.datatorrent.bufferserver.storage.Storage;
 import com.datatorrent.common.util.NameableThreadFactory;
 import com.datatorrent.netlet.AbstractLengthPrependerClient;
@@ -100,16 +111,15 @@ public class Server implements ServerListener
   @Override
   public void unregistered(SelectionKey key)
   {
-        serverHelperExecutor.shutdown();
-        storageHelperExecutor.shutdown();
-        try {
-          serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException ex) {
-          logger.debug("Executor Termination", ex);
-        }
-        logger.info("Server stopped listening at {}", address);
-      }
+    serverHelperExecutor.shutdown();
+    storageHelperExecutor.shutdown();
+    try {
+      serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ex) {
+      logger.debug("Executor Termination", ex);
+    }
+    logger.info("Server stopped listening at {}", address);
+  }
 
   public synchronized InetSocketAddress run(EventLoop eventloop)
   {
@@ -117,8 +127,7 @@ public class Server implements ServerListener
     while (address == null) {
       try {
         wait(20);
-      }
-      catch (InterruptedException ex) {
+      } catch (InterruptedException ex) {
         throw new RuntimeException(ex);
       }
     }
@@ -142,8 +151,7 @@ public class Server implements ServerListener
     int port;
     if (args.length > 0) {
       port = Integer.parseInt(args[0]);
-    }
-    else {
+    } else {
       port = 0;
     }
 
@@ -173,8 +181,7 @@ public class Server implements ServerListener
     byte[] message;
     if (dl == null) {
       message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes();
-    }
-    else {
+    } else {
       dl.purge(request.getBaseSeconds(), request.getWindowId());
       message = ("Request sent for processing: " + request).getBytes();
     }
@@ -201,8 +208,7 @@ public class Server implements ServerListener
     byte[] message;
     if (dl == null) {
       message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes();
-    }
-    else {
+    } else {
       AbstractLengthPrependerClient channel = publisherChannels.remove(request.getIdentifier());
       if (channel != null) {
         eventloop.disconnect(channel);
@@ -252,8 +258,7 @@ public class Server implements ServerListener
       ln = subscriberGroups.get(type);
       ln.boot(eventloop);
       ln.addConnection(connection);
-    }
-    else {
+    } else {
       /*
        * if there is already a datalist registered for the type in which this client is interested,
        * then get a iterator on the data items of that data list. If the datalist is not registered,
@@ -263,8 +268,7 @@ public class Server implements ServerListener
       if (publisherBuffers.containsKey(upstream_identifier)) {
         dl = publisherBuffers.get(upstream_identifier);
         //logger.debug("old list = {}", dl);
-      }
-      else {
+      } else {
         dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
         publisherBuffers.put(upstream_identifier, dl);
         //logger.debug("new list = {}", dl);
@@ -315,12 +319,10 @@ public class Server implements ServerListener
       dl = publisherBuffers.get(identifier);
       try {
         dl.rewind(request.getBaseSeconds(), request.getWindowId());
-      }
-      catch (IOException ie) {
+      } catch (IOException ie) {
         throw new RuntimeException(ie);
       }
-    }
-    else {
+    } else {
       dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks);
       publisherBuffers.put(identifier, dl);
     }
@@ -422,8 +424,7 @@ public class Server implements ServerListener
               }
 
             };
-          }
-          else {
+          } else {
             publisher = new Publisher(dl, (long)request.getBaseSeconds() << 32 | request.getWindowId());
           }
 
@@ -457,8 +458,7 @@ public class Server implements ServerListener
 //          }
           if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) {
             subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize);
-          }
-          else {
+          } else {
             subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize)
             {
               @Override
@@ -494,8 +494,7 @@ public class Server implements ServerListener
           logger.info("Received purge request: {}", request);
           try {
             handlePurgeRequest((PurgeRequestTuple)request, this);
-          }
-          catch (IOException io) {
+          } catch (IOException io) {
             throw new RuntimeException(io);
           }
           break;
@@ -504,8 +503,7 @@ public class Server implements ServerListener
           logger.info("Received reset all request: {}", request);
           try {
             handleResetRequest((ResetRequestTuple)request, this);
-          }
-          catch (IOException io) {
+          } catch (IOException io) {
             throw new RuntimeException(io);
           }
           break;
@@ -636,9 +634,13 @@ public class Server implements ServerListener
         {
           final int interestOps = key.interestOps();
           if ((interestOps & SelectionKey.OP_READ) == 0) {
-            logger.debug("Resuming read on key {} with attachment {}", key, key.attachment());
-            read(0);
-            key.interestOps(interestOps | SelectionKey.OP_READ);
+            if (readExt(0)) {
+              logger.debug("Resuming read on key {} with attachment {}", key, key.attachment());
+              key.interestOps(interestOps | SelectionKey.OP_READ);
+            } else {
+              logger.debug("Keeping read on key {} with attachment {} suspended. ", key, key.attachment(), datalist);
+              datalist.notifyListeners();
+            }
           }
         }
       });
@@ -648,6 +650,11 @@ public class Server implements ServerListener
     @Override
     public void read(int len)
     {
+      readExt(len);
+    }
+
+    private boolean readExt(int len)
+    {
       //logger.debug("read {} bytes", len);
       writeOffset += len;
       do {
@@ -664,18 +671,20 @@ public class Server implements ServerListener
                    * new byteBuffer and start as if we always had full room but not enough data.
                    */
                   if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) {
-                    return;
+                    return false;
                   }
                 }
-              }
-              else if (dirty) {
+              } else if (dirty) {
                 dirty = false;
                 datalist.flush(writeOffset);
               }
-              return;
+              return true;
 
             case 0:
               continue;
+
+            default:
+              break;
           }
         }
 
@@ -683,8 +692,7 @@ public class Server implements ServerListener
           onMessage(buffer, readOffset, size);
           readOffset += size;
           size = 0;
-        }
-        else {
+        } else {
           if (writeOffset == buffer.length) {
             dirty = false;
             datalist.flush(writeOffset);
@@ -694,14 +702,14 @@ public class Server implements ServerListener
             if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) {
               readOffset -= VarInt.getSize(size);
               size = 0;
-              return;
+              return false;
             }
             size = 0;
           } else if (dirty) {
             dirty = false;
             datalist.flush(writeOffset);
           }
-          return;
+          return true;
         }
       }
       while (true);
@@ -751,8 +759,7 @@ public class Server implements ServerListener
       if (cce instanceof RejectedExecutionException && serverHelperExecutor.isTerminated()) {
         logger.warn("Terminated Executor Exception for {}.", this, cce);
         el.disconnect(this);
-      }
-      else {
+      } else {
         super.handleException(cce, el);
       }
     }
@@ -836,8 +843,7 @@ public class Server implements ServerListener
         if (len < remainingCapacity) {
           remainingCapacity = len;
           byteBuffer.position(writeOffset + remainingCapacity);
-        }
-        else {
+        } else {
           byteBuffer.position(buffer.length);
         }
         System.arraycopy(array, offset, buffer, writeOffset, remainingCapacity);