You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/11 03:04:59 UTC

[1/2] incubator-apex-core git commit: APEX-68: Buffer server should use a separate thread to spool blocks to disk

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 97cbef6c7 -> 928d36804


APEX-68: Buffer server should use a separate thread to spool blocks to disk


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1b8aecf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1b8aecf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1b8aecf3

Branch: refs/heads/devel-3
Commit: 1b8aecf3b429069b92a790a4aae2e41490934de7
Parents: f2a4071
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Wed Sep 9 15:41:30 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Wed Sep 9 15:41:30 2015 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 598 +++++++++++--------
 .../bufferserver/internal/FastDataList.java     |  64 +-
 .../datatorrent/bufferserver/server/Server.java | 101 +++-
 3 files changed, 425 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index baa052a..6806168 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -19,6 +19,8 @@ import java.io.IOException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,8 +34,13 @@ import com.datatorrent.bufferserver.util.BitVector;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.bufferserver.util.SerializedData;
 import com.datatorrent.bufferserver.util.VarInt;
+import com.datatorrent.netlet.AbstractClient;
 import com.datatorrent.netlet.util.VarInt.MutableInt;
 
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
 /**
  * Maintains list of data and manages addition and deletion of the data<p>
  * <br>
@@ -44,83 +51,137 @@ public class DataList
 {
   private final int MAX_COUNT_OF_INMEM_BLOCKS;
   protected final String identifier;
-  private final Integer blocksize;
-  private HashMap<BitVector, HashSet<DataListener>> listeners = new HashMap<BitVector, HashSet<DataListener>>();
-  protected HashSet<DataListener> all_listeners = new HashSet<DataListener>();
+  private final int blockSize;
+  private final HashMap<BitVector, HashSet<DataListener>> listeners = newHashMap();
+  protected final HashSet<DataListener> all_listeners = newHashSet();
+  protected final HashMap<String, DataListIterator> iterators = newHashMap();
   protected Block first;
   protected Block last;
   protected Storage storage;
-  protected ExecutorService autoflushExecutor;
+  protected ExecutorService autoFlushExecutor;
   protected ExecutorService storageExecutor;
+  protected int size;
+  protected int processingOffset;
+  protected long baseSeconds;
+  private final List<AbstractClient> suspendedClients = newArrayList();
+  private final AtomicInteger numberOfInMemBlockPermits;
+  private MutableInt nextOffset = new MutableInt();
+
+  public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks)
+  {
+    if (numberOfCacheBlocks < 1) {
+      throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks);
+    }
+    this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks;
+    numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 1);
+    this.identifier = identifier;
+    this.blockSize = blockSize;
+    first = last = new Block(identifier, blockSize);
+  }
 
-  public int getBlockSize()
+  public DataList(String identifier)
   {
-    return blocksize;
+    /*
+     * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem.
+     * we will use default value of 8 block sizes to be cached in memory
+     */
+    this(identifier, 64 * 1024 * 1024, 8);
   }
 
-  public void rewind(int baseSeconds, int windowId) throws IOException
+  public int getBlockSize()
   {
-    long longWindowId = (long)baseSeconds << 32 | windowId;
+    return blockSize;
+  }
 
-    for (Block temp = first; temp != null; temp = temp.next) {
+  public void rewind(final int baseSeconds, final int windowId) throws IOException
+  {
+    final long longWindowId = (long)baseSeconds << 32 | windowId;
+    logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId));
 
-      if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) {
-        if (temp != last) {
-          temp.next = null;
-          last = temp;
+    int numberOfInMemBlockRewound = 0;
+    synchronized (this) {
+      for (Block temp = first; temp != null; temp = temp.next) {
+        if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) {
+          if (temp != last) {
+            last = temp;
+            do {
+              temp = temp.next;
+              temp.discard(false);
+              if (temp.data != null) {
+                temp.data = null;
+                numberOfInMemBlockRewound++;
+              }
+            } while (temp.next != null);
+            last.next = null;
+            last.acquire(true);
+          }
+          this.baseSeconds = last.rewind(longWindowId);
+          processingOffset = last.writingOffset;
+          size = 0;
+          break;
         }
-
-        this.baseSeconds = temp.rewind(longWindowId);
-        processingOffset = temp.writingOffset;
-        size = 0;
       }
     }
 
-    for (DataListIterator dli : iterators.values()) {
-      dli.rewind(processingOffset);
-    }
+    /*
+      TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block.
+    */
+
+    final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound);
+    assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+    resumeSuspendedClients(numberOfInMemBlockPermits);
+    logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
+
   }
 
   public void reset()
   {
+    logger.debug("Resetting {}", this);
     listeners.clear();
     all_listeners.clear();
 
-    if (storage != null) {
-      while (first != null) {
-        if (first.uniqueIdentifier > 0) {
-          logger.debug("discarding {} {} in reset", identifier, first.uniqueIdentifier);
-          storage.discard(identifier, first.uniqueIdentifier);
+    synchronized (this) {
+      if (storage != null) {
+        Block temp = first;
+        while (temp != last) {
+          temp.discard(false);
+          temp.data = null;
+          temp = temp.next;
         }
-        first = first.next;
       }
+      first = last;
     }
+    numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1);
   }
 
-  public void purge(int baseSeconds, int windowId)
+  public void purge(final int baseSeconds, final int windowId)
   {
-    long longWindowId = (long)baseSeconds << 32 | windowId;
-    logger.debug("purge request for windowId {}", Codec.getStringWindowId(longWindowId));
-
-    Block prev = null;
-    for (Block temp = first; temp != null && temp.starting_window <= longWindowId; temp = temp.next) {
-      if (temp.ending_window > longWindowId || temp == last) {
-        if (prev != null) {
-          first = temp;
+    final long longWindowId = (long)baseSeconds << 32 | windowId;
+    logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId));
+
+    int numberOfInMemBlockPurged = 0;
+    synchronized (this) {
+      for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) {
+        if (temp.ending_window > longWindowId || temp == last) {
+          if (prev != null) {
+            first = temp;
+          }
+          first.purge(longWindowId);
+          break;
+        }
+        temp.discard(false);
+        if (temp.data != null) {
+          temp.data = null;
+          numberOfInMemBlockPurged++;
         }
-
-        first.purge(longWindowId);
-        break;
       }
+    }
 
-      if (storage != null && temp.uniqueIdentifier > 0) {
-        logger.debug("discarding {} {} in purge", identifier, temp.uniqueIdentifier);
-
-        storage.discard(identifier, temp.uniqueIdentifier);
-      }
+    final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged);
+    assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+    resumeSuspendedClients(numberOfInMemBlockPermits);
+    logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
 
-      prev = temp;
-    }
   }
 
   /**
@@ -131,35 +192,6 @@ public class DataList
     return identifier;
   }
 
-  public DataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount)
-  {
-    this(identifier, blocksize, numberOfCacheBlocks);
-    first.refCount = refCount;
-  }
-
-  public DataList(String identifier, int blocksize, int numberOfCacheBlocks)
-  {
-    this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks;
-    this.identifier = identifier;
-    this.blocksize = blocksize;
-    first = new Block(identifier, blocksize);
-    last = first;
-  }
-
-  public DataList(String identifier)
-  {
-    /*
-     * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem.
-     * we will use default value of 8 block sizes to be cached in memory
-     */
-    this(identifier, 64 * 1024 * 1024, 8);
-  }
-
-  MutableInt nextOffset = new MutableInt();
-  long baseSeconds;
-  int size;
-  int processingOffset;
-
   public void flush(final int writeOffset)
   {
     //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset);
@@ -195,8 +227,7 @@ public class DataList
               last.starting_window = baseSeconds | bwt.getWindowId();
               last.ending_window = last.starting_window;
               //logger.debug("assigned both window id {}", last);
-            }
-            else {
+            } else {
               last.ending_window = baseSeconds | bwt.getWindowId();
               //logger.debug("assigned last window id {}", last);
             }
@@ -209,8 +240,7 @@ public class DataList
         }
         processingOffset += size;
         size = 0;
-      }
-      else {
+      } else {
         if (writeOffset == last.data.length) {
           nextOffset.integer = 0;
           processingOffset = 0;
@@ -218,12 +248,11 @@ public class DataList
         }
         break;
       }
-    }
-    while (true);
+    } while (true);
 
     last.writingOffset = writeOffset;
 
-    autoflushExecutor.submit(new Runnable()
+    autoFlushExecutor.submit(new Runnable()
     {
       @Override
       public void run()
@@ -236,9 +265,9 @@ public class DataList
     });
   }
 
-  public void setAutoflushExecutor(final ExecutorService es)
+  public void setAutoFlushExecutor(final ExecutorService es)
   {
-    autoflushExecutor = es;
+    autoFlushExecutor = es;
   }
 
   public void setSecondaryStorage(Storage storage, ExecutorService es)
@@ -250,13 +279,16 @@ public class DataList
   /*
    * Iterator related functions.
    */
-  protected final HashMap<String, DataListIterator> iterators = new HashMap<String, DataListIterator>();
-
-  public DataListIterator getIterator(Block block)
+  protected DataListIterator getIterator(final Block block)
   {
     return new DataListIterator(block);
   }
 
+  private synchronized Block getNextBlock(final Block block)
+  {
+    return block.next;
+  }
+
   public Iterator<SerializedData> newIterator(String identifier, long windowId)
   {
     //logger.debug("request for a new iterator {} and {}", identifier, windowId);
@@ -283,21 +315,17 @@ public class DataList
    */
   public boolean delIterator(Iterator<SerializedData> iterator)
   {
-    boolean released = false;
     if (iterator instanceof DataListIterator) {
       DataListIterator dli = (DataListIterator)iterator;
       for (Entry<String, DataListIterator> e : iterators.entrySet()) {
         if (e.getValue() == dli) {
-          if (dli.da != null) {
-            dli.da.release(false);
-          }
+          dli.close();
           iterators.remove(e.getKey());
-          released = true;
-          break;
+          return true;
         }
       }
     }
-    return released;
+    return false;
   }
 
   public void addDataListener(DataListener dl)
@@ -310,20 +338,17 @@ public class DataList
         HashSet<DataListener> set;
         if (listeners.containsKey(partition)) {
           set = listeners.get(partition);
-        }
-        else {
+        } else {
           set = new HashSet<DataListener>();
           listeners.put(partition, set);
         }
         set.add(dl);
       }
-    }
-    else {
+    } else {
       HashSet<DataListener> set;
       if (listeners.containsKey(DataListener.NULL_PARTITION)) {
         set = listeners.get(DataListener.NULL_PARTITION);
-      }
-      else {
+      } else {
         set = new HashSet<DataListener>();
         listeners.put(DataListener.NULL_PARTITION, set);
       }
@@ -341,8 +366,7 @@ public class DataList
           listeners.get(partition).remove(dl);
         }
       }
-    }
-    else {
+    } else {
       if (listeners.containsKey(DataListener.NULL_PARTITION)) {
         listeners.get(DataListener.NULL_PARTITION).remove(dl);
       }
@@ -351,43 +375,46 @@ public class DataList
     all_listeners.remove(dl);
   }
 
-  public void addBuffer(byte[] array)
+  public boolean suspendRead(final AbstractClient client)
   {
-    last.next = new Block(identifier, array);
-    last.next.starting_window = last.ending_window;
-    last.next.ending_window = last.ending_window;
-    last = last.next;
-
-    //logger.debug("addbuffer last = {}", last);
-    int inmemBlockCount;
+    synchronized (suspendedClients) {
+      return client.suspendReadIfResumed() && suspendedClients.add(client);
+    }
+  }
 
-    inmemBlockCount = 0;
-    for (Block temp = first; temp != null; temp = temp.next) {
-      if (temp.data != null) {
-        inmemBlockCount++;
+  public boolean resumeSuspendedClients(final int numberOfInMemBlockPermits)
+  {
+    boolean resumedSuspendedClients = false;
+    if (numberOfInMemBlockPermits > 0) {
+      synchronized (suspendedClients) {
+        for (AbstractClient client : suspendedClients) {
+          resumedSuspendedClients |= client.resumeReadIfSuspended();
+        }
+        suspendedClients.clear();
       }
     }
+    return resumedSuspendedClients;
+  }
 
-    if (inmemBlockCount >= MAX_COUNT_OF_INMEM_BLOCKS) {
-      //logger.debug("InmemBlockCount before releaes {}", inmemBlockCount);
-      for (Block temp = first; temp != null; temp = temp.next) {
-        boolean found = false;
-        for (DataListIterator iterator : iterators.values()) {
-          if (iterator.da == temp) {
-            found = true;
-            break;
-          }
-        }
+  public boolean isMemoryBlockAvailable()
+  {
+    return numberOfInMemBlockPermits.get() > 0;
+  }
 
-        if (!found && temp.data != null) {
-          temp.release(true);
-          if (--inmemBlockCount < MAX_COUNT_OF_INMEM_BLOCKS) {
-            break;
-          }
-        }
-      }
-      //logger.debug("InmemBlockCount after release {}", inmemBlockCount);
+  public byte[] newBuffer()
+  {
+    return new byte[blockSize];
+  }
+
+  public void addBuffer(byte[] array)
+  {
+    final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.decrementAndGet();
+    if (numberOfInMemBlockPermits < 0) {
+      logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
     }
+    last.next = new Block(identifier, array, last.ending_window, last.ending_window);
+    last.release(false);
+    last = last.next;
   }
 
   public byte[] getBuffer(long windowId)
@@ -461,6 +488,12 @@ public class DataList
     return status;
   }
 
+  @Override
+  public String toString()
+  {
+    return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier + '}';
+  }
+
   /**
    * <p>Block class.</p>
    *
@@ -484,7 +517,7 @@ public class DataList
     /**
      * The starting window which is available in this data array.
      */
-    long starting_window = -1;
+    long starting_window;
     /**
      * the ending window which is available in this data array
      */
@@ -500,7 +533,8 @@ public class DataList
     /**
      * how count of references to this block.
      */
-    int refCount;
+    AtomicInteger refCount;
+    Future future;
 
     public Block(String id, int size)
     {
@@ -509,9 +543,17 @@ public class DataList
 
     public Block(String id, byte[] array)
     {
+      this(id, array, -1, 0);
+    }
+
+    public Block(final String id, final byte[] array, final long starting_window, final long ending_window)
+    {
       identifier = id;
       data = array;
-      refCount = 1;
+      refCount = new AtomicInteger(1);
+      this.starting_window = starting_window;
+      this.ending_window = ending_window;
+      //logger.debug("Allocated new {}", this);
     }
 
     void getNextData(SerializedData current)
@@ -530,27 +572,28 @@ public class DataList
     public long rewind(long windowId)
     {
       long bs = starting_window & 0x7fffffff00000000L;
-      DataListIterator dli = getIterator(this);
-      done:
-      while (dli.hasNext()) {
-        SerializedData sd = dli.next();
-        switch (sd.buffer[sd.dataOffset]) {
-          case MessageType.RESET_WINDOW_VALUE:
-            ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
-            bs = (long)rwt.getBaseSeconds() << 32;
-            if (bs > windowId) {
-              writingOffset = sd.offset;
-              break done;
-            }
-            break;
+      try (DataListIterator dli = getIterator(this)) {
+        done:
+        while (dli.hasNext()) {
+          SerializedData sd = dli.next();
+          switch (sd.buffer[sd.dataOffset]) {
+            case MessageType.RESET_WINDOW_VALUE:
+              ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              bs = (long)rwt.getBaseSeconds() << 32;
+              if (bs > windowId) {
+                writingOffset = sd.offset;
+                break done;
+              }
+              break;
 
-          case MessageType.BEGIN_WINDOW_VALUE:
-            BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
-            if ((bs | bwt.getWindowId()) >= windowId) {
-              writingOffset = sd.offset;
-              break done;
-            }
-            break;
+            case MessageType.BEGIN_WINDOW_VALUE:
+              BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              if ((bs | bwt.getWindowId()) >= windowId) {
+                writingOffset = sd.offset;
+                break done;
+              }
+              break;
+          }
         }
       }
 
@@ -558,16 +601,12 @@ public class DataList
         starting_window = windowId;
         ending_window = windowId;
         //logger.debug("assigned both window id {}", this);
-      }
-      else if (windowId < ending_window) {
+      } else if (windowId < ending_window) {
         ending_window = windowId;
         //logger.debug("assigned end window id {}", this);
       }
 
-      if (uniqueIdentifier != 0) {
-        storage.discard(identifier, uniqueIdentifier);
-        uniqueIdentifier = 0;
-      }
+      discard(false);
 
       return bs;
     }
@@ -580,39 +619,40 @@ public class DataList
       long bs = starting_window & 0xffffffff00000000L;
       SerializedData lastReset = null;
 
-      DataListIterator dli = getIterator(this);
-      done:
-      while (dli.hasNext()) {
-        SerializedData sd = dli.next();
-        switch (sd.buffer[sd.dataOffset]) {
-          case MessageType.RESET_WINDOW_VALUE:
-            ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
-            bs = (long)rwt.getBaseSeconds() << 32;
-            lastReset = sd;
-            break;
-
-          case MessageType.BEGIN_WINDOW_VALUE:
-            BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
-            if ((bs | bwt.getWindowId()) > longWindowId) {
-              found = true;
-              if (lastReset != null) {
+      try (DataListIterator dli = getIterator(this)) {
+        done:
+        while (dli.hasNext()) {
+          SerializedData sd = dli.next();
+          switch (sd.buffer[sd.dataOffset]) {
+            case MessageType.RESET_WINDOW_VALUE:
+              ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              bs = (long)rwt.getBaseSeconds() << 32;
+              lastReset = sd;
+              break;
+
+            case MessageType.BEGIN_WINDOW_VALUE:
+              BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+              if ((bs | bwt.getWindowId()) > longWindowId) {
+                found = true;
+                if (lastReset != null) {
                 /*
                  * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple.
                  */
-                if (sd.offset >= lastReset.length) {
-                  sd.offset -= lastReset.length;
-                  if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) {
-                    System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length);
+                  if (sd.offset >= lastReset.length) {
+                    sd.offset -= lastReset.length;
+                    if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) {
+                      System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length);
+                    }
                   }
+
+                  this.starting_window = bs | bwt.getWindowId();
+                  this.readingOffset = sd.offset;
+                  //logger.debug("assigned starting window id {}", this);
                 }
 
-                this.starting_window = bs | bwt.getWindowId();
-                this.readingOffset = sd.offset;
-                //logger.debug("assigned starting window id {}", this);
+                break done;
               }
-
-              break done;
-            }
+          }
         }
       }
 
@@ -654,46 +694,44 @@ public class DataList
           logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset);
         }
 
-        if (uniqueIdentifier != 0) {
-          storage.discard(identifier, uniqueIdentifier);
-          uniqueIdentifier = 0;
-        }
+        discard(false);
       }
     }
 
-    private Runnable getRetriever(final int uniqueIdentifier, final Storage storage)
+    private Runnable getRetriever()
     {
       return new Runnable()
       {
         @Override
         public void run()
         {
-          byte[] lData = storage.retrieve(identifier, uniqueIdentifier);
+          byte[] data = storage.retrieve(identifier, uniqueIdentifier);
           synchronized (Block.this) {
-            data = lData;
+            Block.this.data = data;
             readingOffset = 0;
             writingOffset = data.length;
-            if (refCount > 1) {
+            if (refCount.get() > 1) {
               Block.this.notifyAll();
             }
+            int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet();
+            if (numberOfInMemBlockPermits < 0) {
+              logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
+            }
           }
         }
-
       };
     }
 
-    synchronized void acquire(boolean wait)
+    protected void acquire(boolean wait)
     {
-      if (refCount++ == 0 && uniqueIdentifier > 0 && storage != null) {
-        assert (data == null);
+      if (refCount.getAndIncrement() == 0 && storage != null && data == null) {
+        final Runnable retriever = getRetriever();
         if (wait) {
-          getRetriever(uniqueIdentifier, storage).run();
+          retriever.run();
+        } else {
+          future = storageExecutor.submit(retriever);
         }
-        else {
-          storageExecutor.submit(getRetriever(uniqueIdentifier, storage));
-        }
-      }
-      else if (wait && data == null) {
+      } else if (wait && data == null) {
         try {
           wait();
         }
@@ -710,35 +748,70 @@ public class DataList
         @Override
         public void run()
         {
-          int i = storage.store(identifier, data, readingOffset, writingOffset);
-          if (i == 0) {
+          if (uniqueIdentifier == 0) {
+            uniqueIdentifier = storage.store(identifier, data, readingOffset, writingOffset);
+          }
+          if (uniqueIdentifier == 0) {
             logger.warn("Storage returned unexpectedly, please check the status of the spool directory!");
           }
           else {
+            //logger.debug("Spooled {} to disk", Block.this);
             synchronized (Block.this) {
-              Block.this.uniqueIdentifier = i;
-              if (refCount == 0) {
+              if (refCount.get() == 0) {
                 Block.this.data = null;
               }
             }
+            int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet();
+            assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+            resumeSuspendedClients(numberOfInMemBlockPermits);
           }
         }
+      };
+    }
+
+    protected void release(boolean wait)
+    {
+      final int refCount = this.refCount.decrementAndGet();
+      if (refCount == 0 && storage != null) {
+        assert (next != null);
+        final Runnable storer = getStorer(data, readingOffset, writingOffset, storage);
+        if (wait && numberOfInMemBlockPermits.get() == 0) {
+          storer.run();
+        } else if (numberOfInMemBlockPermits.get() < MAX_COUNT_OF_INMEM_BLOCKS/2) {
+          future = storageExecutor.submit(storer);
+        }
+      } else {
+        logger.debug("Holding {} in memory due to {} references.", this, refCount);
+      }
+    }
 
+    private Runnable getDiscarder()
+    {
+      return new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          if (uniqueIdentifier > 0) {
+            logger.debug("Discarding {}", Block.this);
+            storage.discard(identifier, uniqueIdentifier);
+            uniqueIdentifier = 0;
+          }
+        }
       };
     }
 
-    synchronized void release(boolean wait)
+    protected void discard(final boolean wait)
     {
-      if (--refCount == 0 && storage != null) {
-        if (uniqueIdentifier != 0) {
-          data = null;
-          return;
+      if (storage != null) {
+        if (future != null) {
+          future.cancel(false);
         }
+        final Runnable discarder = getDiscarder();
         if (wait) {
-          getStorer(data, readingOffset, writingOffset, storage).run();
-        }
-        else {
-          storageExecutor.submit(getStorer(data, readingOffset, writingOffset, storage));
+          discarder.run();
+        } else {
+          future = storageExecutor.submit(discarder);
         }
       }
     }
@@ -746,10 +819,10 @@ public class DataList
     @Override
     public String toString()
     {
-      return "Block{" + "identifier=" + identifier + ", data=" + (data == null ? "null" : data.length)
+      return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length)
              + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset
              + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window)
-             + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
+             + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
              + '}';
     }
 
@@ -760,7 +833,7 @@ public class DataList
    *
    * @since 0.3.2
    */
-  public class DataListIterator implements Iterator<SerializedData>
+  public class DataListIterator implements Iterator<SerializedData>, AutoCloseable
   {
     Block da;
     SerializedData current;
@@ -792,12 +865,28 @@ public class DataList
       return readOffset;
     }
 
+    protected boolean switchToNextBlock()
+    {
+      Block next = getNextBlock(da);
+      if (next == null) {
+        return false;
+      }
+      //logger.debug("{}: switching to the next block {}->{}", this, da, da.next);
+      next.acquire(true);
+      da.release(false);
+      da = next;
+      size = 0;
+      buffer = da.data;
+      readOffset = da.readingOffset;
+      return true;
+    }
+
     /**
      *
      * @return boolean
      */
     @Override
-    public synchronized boolean hasNext()
+    public boolean hasNext()
     {
       while (size == 0) {
         size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset);
@@ -810,53 +899,26 @@ public class DataList
           case -2:
           case -1:
           case 0:
-            if (da.writingOffset == buffer.length) {
-              if (da.next == null) {
-                return false;
-              }
-
-              da.release(false);
-              da.next.acquire(true);
-              da = da.next;
-              size = 0;
-              buffer = da.data;
-              readOffset = da.readingOffset;
-            }
-            else {
-              return false;
+            if (da.writingOffset == buffer.length && switchToNextBlock()) {
+              continue;
             }
+            return false;
         }
       }
 
-      while (true) {
-        if (nextOffset.integer + size <= da.writingOffset) {
-          current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
-          current.dataOffset = nextOffset.integer;
-          //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) {
-          //  Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset);
-          //  logger.debug("next t = {}", t);
-          //}
-          return true;
-        }
-        else {
-          if (da.writingOffset == buffer.length) {
-            if (da.next == null) {
-              return false;
-            }
-            else {
-              da.release(false);
-              da.next.acquire(true);
-              da = da.next;
-              size = 0;
-              readOffset = nextOffset.integer = da.readingOffset;
-              buffer = da.data;
-            }
-          }
-          else {
-            return false;
-          }
-        }
+      if (nextOffset.integer + size <= da.writingOffset) {
+        current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
+        current.dataOffset = nextOffset.integer;
+        //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) {
+        //  Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset);
+        //  logger.debug("next t = {}", t);
+        //}
+        return true;
+      } else if (da.writingOffset == buffer.length && switchToNextBlock()) {
+        nextOffset.integer = da.readingOffset;
+        return hasNext();
       }
+      return false;
     }
 
     /**
@@ -882,6 +944,16 @@ public class DataList
       current.buffer[current.dataOffset] = MessageType.NO_MESSAGE_VALUE;
     }
 
+    @Override
+    public void close()
+    {
+      if (da != null) {
+        da.release(false);
+        da = null;
+        buffer = null;
+      }
+    }
+
     void rewind(int processingOffset)
     {
       readOffset = processingOffset;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index d260b37..fe0d9f4 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -39,12 +39,6 @@ public class FastDataList extends DataList
     super(identifier, blocksize, numberOfCacheBlocks);
   }
 
-  public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount)
-  {
-    super(identifier, blocksize, numberOfCacheBlocks, refCount);
-  }
-
-
   long item;
 
   @Override
@@ -102,7 +96,7 @@ public class FastDataList extends DataList
 
     last.writingOffset = writeOffset;
 
-    autoflushExecutor.submit(new Runnable()
+    autoFlushExecutor.submit(new Runnable()
     {
       @Override
       public void run()
@@ -116,7 +110,7 @@ public class FastDataList extends DataList
   }
 
   @Override
-  public FastDataListIterator getIterator(Block block)
+  protected FastDataListIterator getIterator(Block block)
   {
     return new FastDataListIterator(block);
   }
@@ -188,59 +182,37 @@ public class FastDataList extends DataList
     }
 
     @Override
-    public synchronized boolean hasNext()
+    public boolean hasNext()
     {
       while (size == 0) {
         if (da.writingOffset - readOffset >= 2) {
           size = buffer[readOffset];
           size |= (buffer[readOffset + 1] << 8);
-        }
-        else {
-          if (da.writingOffset == buffer.length) {
-            if (da.next == null) {
-              return false;
-            }
-
-            da.release(false);
-            da.next.acquire(true);
-            da = da.next;
-            size = 0;
-            buffer = da.data;
-            readOffset = da.readingOffset;
-          }
-          else {
+        } else {
+          if (da.writingOffset == buffer.length && switchToNextBlock()) {
+            continue;
+          } else {
             return false;
           }
         }
       }
 
-      while (true) {
-        if (readOffset + size + 2 <= da.writingOffset) {
-          current = new SerializedData(buffer, readOffset, size + 2);
-          current.dataOffset = readOffset + 2;
-          return true;
-        }
-        else {
-          if (da.writingOffset == buffer.length) {
-            if (da.next == null) {
-              return false;
-            }
-            else {
-              da.release(false);
-              da.next.acquire(true);
-              da = da.next;
-              size = 0;
-              readOffset = nextOffset.integer = da.readingOffset;
-              buffer = da.data;
-            }
-          }
-          else {
+      if (readOffset + size + 2 <= da.writingOffset) {
+        current = new SerializedData(buffer, readOffset, size + 2);
+        current.dataOffset = readOffset + 2;
+        return true;
+      } else {
+        if (da.writingOffset == buffer.length) {
+          if (!switchToNextBlock()) {
             return false;
           }
+          nextOffset.integer = da.readingOffset;
+          return hasNext();
+        } else {
+          return false;
         }
       }
     }
-
   }
 
   private static final Logger logger = LoggerFactory.getLogger(FastDataList.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7fb4823..33a2442 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -97,16 +97,16 @@ public class Server implements ServerListener
   @Override
   public void unregistered(SelectionKey key)
   {
-    serverHelperExecutor.shutdown();
-    storageHelperExecutor.shutdown();
-    try {
-      serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
-    }
-    catch (InterruptedException ex) {
-      logger.debug("Executor Termination", ex);
-    }
-    logger.info("Server stopped listening at {}", address);
-  }
+        serverHelperExecutor.shutdown();
+        storageHelperExecutor.shutdown();
+        try {
+          serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex) {
+          logger.debug("Executor Termination", ex);
+        }
+        logger.info("Server stopped listening at {}", address);
+      }
 
   public synchronized InetSocketAddress run(EventLoop eventloop)
   {
@@ -262,7 +262,7 @@ public class Server implements ServerListener
         //logger.debug("old list = {}", dl);
       }
       else {
-        dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0);
+        dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
         publisherBuffers.put(upstream_identifier, dl);
         //logger.debug("new list = {}", dl);
       }
@@ -401,7 +401,7 @@ public class Server implements ServerListener
           PublishRequestTuple publisherRequest = (PublishRequestTuple)request;
 
           DataList dl = handlePublisherRequest(publisherRequest, this);
-          dl.setAutoflushExecutor(serverHelperExecutor);
+          dl.setAutoFlushExecutor(serverHelperExecutor);
 
           Publisher publisher;
           if (publisherRequest.getVersion().equals(Tuple.FAST_VERSION)) {
@@ -616,6 +616,32 @@ public class Server implements ServerListener
       dirty = true;
     }
 
+    /**
+     * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
+     * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)}
+     * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key
+     * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
+     * @return true
+     */
+    @Override
+    public boolean resumeReadIfSuspended()
+    {
+      eventloop.submit(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          final int interestOps = key.interestOps();
+          if ((interestOps & SelectionKey.OP_READ) == 0) {
+            logger.debug("Resuming read on key {} with attachment {}", key, key.attachment());
+            read(0);
+            key.interestOps(interestOps | SelectionKey.OP_READ);
+          }
+        }
+      });
+      return true;
+    }
+
     @Override
     public void read(int len)
     {
@@ -634,7 +660,9 @@ public class Server implements ServerListener
                    * so we allocate a new byteBuffer and copy over the partially written data to the
                    * new byteBuffer and start as if we always had full room but not enough data.
                    */
-                  switchToNewBuffer(buffer, readOffset);
+                  if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) {
+                    return;
+                  }
                 }
               }
               else if (dirty) {
@@ -660,10 +688,13 @@ public class Server implements ServerListener
             /*
              * hit wall while writing serialized data, so have to allocate a new byteBuffer.
              */
-            switchToNewBuffer(buffer, readOffset - VarInt.getSize(size));
+            if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) {
+              readOffset -= VarInt.getSize(size);
+              size = 0;
+              return;
+            }
             size = 0;
-          }
-          else if (dirty) {
+          } else if (dirty) {
             dirty = false;
             datalist.flush(writeOffset);
           }
@@ -673,21 +704,33 @@ public class Server implements ServerListener
       while (true);
     }
 
-    public void switchToNewBuffer(byte[] array, int offset)
+    private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset)
     {
-      byte[] newBuffer = new byte[datalist.getBlockSize()];
-      byteBuffer = ByteBuffer.wrap(newBuffer);
-      if (array == null || array.length - offset == 0) {
-        writeOffset = 0;
+      if (switchToNewBuffer(array, offset)) {
+        return true;
       }
-      else {
-        writeOffset = array.length - offset;
-        System.arraycopy(buffer, offset, newBuffer, 0, writeOffset);
-        byteBuffer.position(writeOffset);
+      datalist.suspendRead(this);
+      return false;
+    }
+
+    private boolean switchToNewBuffer(final byte[] array, final int offset)
+    {
+      if (datalist.isMemoryBlockAvailable()) {
+        final byte[] newBuffer = datalist.newBuffer();
+        byteBuffer = ByteBuffer.wrap(newBuffer);
+        if (array == null || array.length - offset == 0) {
+          writeOffset = 0;
+        } else {
+          writeOffset = array.length - offset;
+          System.arraycopy(buffer, offset, newBuffer, 0, writeOffset);
+          byteBuffer.position(writeOffset);
+        }
+        buffer = newBuffer;
+        readOffset = 0;
+        datalist.addBuffer(buffer);
+        return true;
       }
-      buffer = newBuffer;
-      readOffset = 0;
-      datalist.addBuffer(buffer);
+      return false;
     }
 
     @Override
@@ -714,7 +757,7 @@ public class Server implements ServerListener
     @Override
     public String toString()
     {
-      return "Server.Publisher{" + "datalist=" + datalist + '}';
+      return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {datalist=" + datalist + '}';
     }
 
     private volatile boolean torndown;



[2/2] incubator-apex-core git commit: Merge branch 'APEX-68' of github.com:vrozov/incubator-apex-core into vlad-apex-68

Posted by ch...@apache.org.
Merge branch 'APEX-68' of github.com:vrozov/incubator-apex-core into vlad-apex-68


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/928d3680
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/928d3680
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/928d3680

Branch: refs/heads/devel-3
Commit: 928d36804b695e35e8a59cc8acf3aabc23b296a9
Parents: 97cbef6 1b8aecf
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Thu Sep 10 17:58:24 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Thu Sep 10 17:58:24 2015 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 598 +++++++++++--------
 .../bufferserver/internal/FastDataList.java     |  64 +-
 .../datatorrent/bufferserver/server/Server.java | 101 +++-
 3 files changed, 425 insertions(+), 338 deletions(-)
----------------------------------------------------------------------