You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/19 21:56:37 UTC

[2/2] incubator-asterixdb git commit: Add Unit Tests for Feed Runtime Input Handler

Add Unit Tests for Feed Runtime Input Handler

Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55
Reviewed-on: https://asterix-gerrit.ics.uci.edu/866
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mi...@couchbase.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/803a3a2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/803a3a2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/803a3a2f

Branch: refs/heads/master
Commit: 803a3a2faeb82eff07d821e137806c878366908e
Parents: f815395
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Thu May 19 14:26:09 2016 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Thu May 19 14:55:35 2016 -0700

----------------------------------------------------------------------
 .../feed/dataflow/FeedRuntimeInputHandler.java  | 222 +++++-
 .../external/feed/dataflow/FrameAction.java     |  42 +-
 .../external/feed/dataflow/FrameSpiller.java    |  13 +-
 .../feed/management/ConcurrentFramePool.java    | 107 ++-
 .../external/feed/runtime/FeedRuntimeId.java    |   4 +-
 .../feed/test/ConcurrentFramePoolUnitTest.java  | 613 ++++++++++++++
 .../feed/test/FeedMemoryManagerUnitTest.java    | 482 -----------
 .../external/feed/test/InputHandlerTest.java    | 794 +++++++++++++++++++
 hyracks-fullstack/hyracks/hyracks-api/pom.xml   |  35 +-
 .../hyracks/api/test/CountAndThrowError.java    |  42 +
 .../api/test/CountAndThrowException.java        |  42 +
 .../apache/hyracks/api/test/CountAnswer.java    |  46 ++
 .../hyracks/api/test/FrameWriterTestUtils.java  |  65 ++
 .../api/test/TestControlledFrameWriter.java     |  82 ++
 .../hyracks/api/test/TestFrameWriter.java       | 173 ++++
 .../hyracks/hyracks-storage-am-btree/pom.xml    |   7 +
 .../storage/am/btree/test/FramewriterTest.java  |  58 +-
 17 files changed, 2222 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3920a03..d201a6a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /**
- * TODO: Add unit test cases for this class
+ * TODO: Add Failure cases unit tests for this class
  * Provides for error-handling and input-side buffering for a feed runtime.
  * .............______.............
  * ............|......|............
@@ -48,8 +48,9 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperato
 public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
-    private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
     private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
+    private static final boolean DEBUG = false;
+    private final Object mutex = new Object();
     private final FeedExceptionHandler exceptionHandler;
     private final FrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
@@ -58,15 +59,16 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
     private final FrameTransporter consumer;
     private final Thread consumerThread;
     private final LinkedBlockingDeque<ByteBuffer> inbox;
-    private final ConcurrentFramePool memoryManager;
+    private final ConcurrentFramePool framePool;
     private Mode mode = Mode.PROCESS;
+    private int total = 0;
     private int numDiscarded = 0;
     private int numSpilled = 0;
     private int numProcessedInMemory = 0;
     private int numStalled = 0;
 
     public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool feedMemoryManager)
+            IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
         this.spiller =
@@ -76,13 +78,13 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
                         fpa.getMaxSpillOnDisk());
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
-        this.memoryManager = feedMemoryManager;
+        this.framePool = framePool;
         this.inbox = new LinkedBlockingDeque<>();
         this.consumer = new FrameTransporter();
-        this.consumerThread = new Thread();
+        this.consumerThread = new Thread(consumer);
         this.consumerThread.start();
         this.initialFrameSize = ctx.getInitialFrameSize();
-        this.frameAction = new FrameAction(inbox);
+        this.frameAction = new FrameAction();
     }
 
     @Override
@@ -101,15 +103,20 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
 
     @Override
     public void close() throws HyracksDataException {
-        inbox.add(POISON_PILL);
-        notify();
+        consumer.poison();
+        synchronized (mutex) {
+            if (DEBUG) {
+                LOGGER.info("Producer is waking up consumer");
+            }
+            mutex.notify();
+        }
         try {
             consumerThread.join();
         } catch (InterruptedException e) {
             LOGGER.log(Level.WARNING, e.getMessage(), e);
         }
         try {
-            memoryManager.release(inbox);
+            framePool.release(inbox);
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
@@ -124,9 +131,13 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
     @Override
     public void nextFrame(ByteBuffer frame) throws HyracksDataException {
         try {
+            total++;
             if (consumer.cause() != null) {
                 throw consumer.cause();
             }
+            if (DEBUG) {
+                LOGGER.info("nextFrame() called. inputHandler is in mode: " + mode.toString());
+            }
             switch (mode) {
                 case PROCESS:
                     process(frame);
@@ -148,25 +159,42 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
         }
     }
 
+    // For unit testing purposes
+    public int framesOnDisk() {
+        return spiller.remaining();
+    }
+
     private ByteBuffer getFreeBuffer(int frameSize) throws HyracksDataException {
         int numFrames = frameSize / initialFrameSize;
         if (numFrames == 1) {
-            return memoryManager.get();
+            return framePool.get();
         } else {
-            return memoryManager.get(frameSize);
+            return framePool.get(frameSize);
         }
     }
 
     private void discard(ByteBuffer frame) throws HyracksDataException {
+        if (DEBUG) {
+            LOGGER.info("starting discard(frame)");
+        }
         if (fpa.spillToDiskOnCongestion()) {
+            if (DEBUG) {
+                LOGGER.info("Spilling to disk is enabled. Will try that");
+            }
             if (spiller.spill(frame)) {
                 numSpilled++;
                 mode = Mode.SPILL;
                 return;
             }
         } else {
+            if (DEBUG) {
+                LOGGER.info("Spilling to disk is disabled. Will try to get a buffer");
+            }
             ByteBuffer next = getFreeBuffer(frame.capacity());
             if (next != null) {
+                if (DEBUG) {
+                    LOGGER.info("Was able to get a buffer");
+                }
                 numProcessedInMemory++;
                 next.put(frame);
                 inbox.offer(next);
@@ -174,102 +202,168 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
                 return;
             }
         }
-        numDiscarded++;
+        if (((numDiscarded + 1.0) / total) > fpa.getMaxFractionDiscard()) {
+            if (DEBUG) {
+                LOGGER.info("in discard(frame). Discard allowance has been consumed. --> Stalling");
+            }
+            stall(frame);
+        } else {
+            if (DEBUG) {
+                LOGGER.info("in discard(frame). So far, I have discarded " + numDiscarded);
+            }
+            numDiscarded++;
+        }
     }
 
-    private synchronized void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+    private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
         if (fpa.spillToDiskOnCongestion()) {
             mode = Mode.SPILL;
             spiller.open();
             spill(frame);
         } else {
+            if (DEBUG) {
+                LOGGER.info("Spilling is disabled --> discardOrStall(frame)");
+            }
             discardOrStall(frame);
         }
     }
 
     private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
         if (fpa.discardOnCongestion()) {
-            numDiscarded++;
             mode = Mode.DISCARD;
             discard(frame);
         } else {
+            if (DEBUG) {
+                LOGGER.info("Discard is disabled --> stall(frame)");
+            }
             stall(frame);
         }
     }
 
     private void stall(ByteBuffer frame) throws HyracksDataException {
         try {
+            if (DEBUG) {
+                LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+            }
             numStalled++;
             // If spilling is enabled, we wait on the spiller
             if (fpa.spillToDiskOnCongestion()) {
-                synchronized (spiller) {
-                    while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
-                        spiller.wait();
-                    }
+                if (DEBUG) {
+                    LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
                 }
+                waitforSpillSpace();
                 spiller.spill(frame);
-                synchronized (this) {
-                    notify();
+                numSpilled++;
+                synchronized (mutex) {
+                    if (DEBUG) {
+                        LOGGER.info("Producer is waking up consumer");
+                    }
+                    mutex.notify();
                 }
                 return;
             }
+            if (DEBUG) {
+                LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
+            }
             // Spilling is disabled, we subscribe to feedMemoryManager
             frameAction.setFrame(frame);
-            synchronized (frameAction) {
-                if (memoryManager.subscribe(frameAction)) {
-                    frameAction.wait();
-                }
+            framePool.subscribe(frameAction);
+            ByteBuffer temp = frameAction.retrieve();
+            inbox.put(temp);
+            numProcessedInMemory++;
+            if (DEBUG) {
+                LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
             }
-            synchronized (this) {
-                notify();
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
             }
         } catch (InterruptedException e) {
             throw new HyracksDataException(e);
         }
     }
 
+    private void waitforSpillSpace() throws InterruptedException {
+        synchronized (spiller) {
+            while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
+                if (DEBUG) {
+                    LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than "
+                            + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = "
+                            + spiller.usedBudget());
+                }
+                spiller.wait();
+            }
+        }
+    }
+
     private void process(ByteBuffer frame) throws HyracksDataException {
-        // Get a page from
-        ByteBuffer next = getFreeBuffer(frame.capacity());
+        // Get a page from frame pool
+        ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null;
         if (next != null) {
+            // Got a page from memory pool
             numProcessedInMemory++;
             next.put(frame);
-            inbox.offer(next);
-            if (inbox.size() == 1) {
-                synchronized (this) {
-                    notify();
-                }
+            try {
+                inbox.put(next);
+                notifyMemoryConsumer();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
             }
         } else {
-            // out of memory. we switch to next mode as per policy -- synchronized method
+            if (DEBUG) {
+                LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
+            }
+            // Out of memory. we switch to next mode as per policy
             exitProcessState(frame);
         }
     }
 
+    private void notifyMemoryConsumer() {
+        if (inbox.size() == 1) {
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
+            }
+        }
+    }
+
     private void spill(ByteBuffer frame) throws HyracksDataException {
         if (spiller.switchToMemory()) {
-            synchronized (this) {
+            synchronized (mutex) {
                 // Check if there is memory
-                ByteBuffer next = getFreeBuffer(frame.capacity());
+                ByteBuffer next = null;
+                if (frame.capacity() <= framePool.getMaxFrameSize()) {
+                    next = getFreeBuffer(frame.capacity());
+                }
                 if (next != null) {
                     spiller.close();
                     numProcessedInMemory++;
                     next.put(frame);
                     inbox.offer(next);
+                    notifyMemoryConsumer();
                     mode = Mode.PROCESS;
                 } else {
-                    // spill. This will always succeed since spilled = 0 (must verify that budget can't be 0)
+                    // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
                     spiller.spill(frame);
                     numSpilled++;
-                    notify();
+                    if (DEBUG) {
+                        LOGGER.info("Producer is waking up consumer");
+                    }
+                    mutex.notify();
                 }
             }
         } else {
             // try to spill. If failed switch to either discard or stall
             if (spiller.spill(frame)) {
+                notifyDiskConsumer();
                 numSpilled++;
             } else {
                 if (fpa.discardOnCongestion()) {
+                    mode = Mode.DISCARD;
                     discard(frame);
                 } else {
                     stall(frame);
@@ -278,11 +372,22 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
         }
     }
 
+    private void notifyDiskConsumer() {
+        if (spiller.remaining() == 1) {
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
+            }
+        }
+    }
+
     public Mode getMode() {
         return mode;
     }
 
-    public synchronized void setMode(Mode mode) {
+    public void setMode(Mode mode) {
         this.mode = mode;
     }
 
@@ -311,15 +416,22 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
 
     private class FrameTransporter implements Runnable {
         private volatile Throwable cause;
+        private int consumed = 0;
+        private boolean poisoned = false;
 
         public Throwable cause() {
             return cause;
         }
 
+        public void poison() {
+            poisoned = true;
+        }
+
         private Throwable consume(ByteBuffer frame) {
             while (frame != null) {
                 try {
                     writer.nextFrame(frame);
+                    consumed++;
                     frame = null;
                 } catch (HyracksDataException e) {
                     // It is fine to catch throwable here since this thread is always expected to terminate gracefully
@@ -340,7 +452,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
         public void run() {
             try {
                 ByteBuffer frame = inbox.poll();
-                while (frame != POISON_PILL) {
+                while (true) {
                     if (frame != null) {
                         try {
                             if (consume(frame) != null) {
@@ -348,7 +460,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
                             }
                         } finally {
                             // Done with frame.
-                            memoryManager.release(frame);
+                            framePool.release(frame);
                         }
                     }
                     frame = inbox.poll();
@@ -366,13 +478,22 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
                         writer.flush();
                         // At this point. We consumed all memory and spilled
                         // We can't assume the next will be in memory. what if there is 0 memory?
-                        synchronized (FeedRuntimeInputHandler.this) {
+                        synchronized (mutex) {
                             frame = inbox.poll();
                             if (frame == null) {
                                 // Nothing in memory
                                 if (spiller.switchToMemory()) {
+                                    if (poisoned) {
+                                        break;
+                                    }
+                                    if (DEBUG) {
+                                        LOGGER.info("Consumer is going to sleep");
+                                    }
                                     // Nothing in disk
-                                    FeedRuntimeInputHandler.this.wait();
+                                    mutex.wait();
+                                    if (DEBUG) {
+                                        LOGGER.info("Consumer is waking up");
+                                    }
                                 }
                             }
                         }
@@ -383,5 +504,18 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
             }
             // cleanup will always be done through the close() call
         }
+
+        @Override
+        public String toString() {
+            return "consumed: " + consumed;
+        }
+    }
+
+    public int getTotal() {
+        return total;
+    }
+
+    public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() {
+        return inbox;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
index 4a2120a..f02b4aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
@@ -19,29 +19,45 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingDeque;
 
-import rx.functions.Action1;
+import org.apache.log4j.Logger;
 
-public class FrameAction implements Action1<ByteBuffer> {
-    private final LinkedBlockingDeque<ByteBuffer> inbox;
+public class FrameAction {
+    private static final boolean DEBUG = false;
+    private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
+    private ByteBuffer allocated;
     private ByteBuffer frame;
 
-    public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
     public void call(ByteBuffer freeFrame) {
+        if (DEBUG) {
+            LOGGER.info("FrameAction: My subscription is being answered");
+        }
         freeFrame.put(frame);
-        inbox.add(freeFrame);
         synchronized (this) {
-            notify();
+            allocated = freeFrame;
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Waking up waiting threads");
+            }
+            notifyAll();
         }
     }
 
-    public ByteBuffer getFrame() {
-        return frame;
+    public synchronized ByteBuffer retrieve() throws InterruptedException {
+        if (DEBUG) {
+            LOGGER.info("FrameAction: Attempting to get allocated buffer");
+        }
+        while (allocated == null) {
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
+            }
+            wait();
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Awoken Up");
+            }
+        }
+        ByteBuffer temp = allocated;
+        allocated = null;
+        return temp;
     }
 
     public void setFrame(ByteBuffer frame) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index f0d226a..a2f19bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -44,6 +44,7 @@ import org.apache.log4j.Logger;
 public class FrameSpiller {
     private static final Logger LOGGER = Logger.getLogger(FrameSpiller.class.getName());
     private static final int FRAMES_PER_FILE = 1024;
+    public static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
 
     private final String fileNamePrefix;
     private final ArrayDeque<File> files = new ArrayDeque<>();
@@ -88,11 +89,11 @@ public class FrameSpiller {
     }
 
     public synchronized ByteBuffer next() throws HyracksDataException {
+        frame.reset();
+        if (totalReadCount == totalWriteCount) {
+            return null;
+        }
         try {
-            frame.reset();
-            if (totalReadCount == totalWriteCount) {
-                return null;
-            }
             if (currentReadFile == null) {
                 if (!files.isEmpty()) {
                     currentReadFile = files.pop();
@@ -126,6 +127,10 @@ public class FrameSpiller {
             return frame.getBuffer();
         } catch (Exception e) {
             throw new HyracksDataException(e);
+        } finally {
+            synchronized (this) {
+                notify();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
index 25aa86a..e5543d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -28,10 +28,15 @@ import java.util.Map.Entry;
 
 import org.apache.asterix.external.feed.dataflow.FrameAction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
 
 public class ConcurrentFramePool {
+    private static final boolean DEBUG = false;
     private static final String ERROR_INVALID_FRAME_SIZE =
             "The size should be an integral multiple of the default frame size";
+    private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+            "The requested frame size must not be greater than the allocated budget";
+    private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
     private final String nodeId;
     private final int budget;
     private final int defaultFrameSize;
@@ -49,11 +54,31 @@ public class ConcurrentFramePool {
         this.largeFramesPools = new HashMap<>();
     }
 
+    public int getMaxFrameSize() {
+        return budget * defaultFrameSize;
+    }
+
     public synchronized ByteBuffer get() {
+        // Subscribers have higher priority
+        if (subscribers.isEmpty()) {
+            return doGet();
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+                    + subscribers.size());
+        }
+        return null;
+    }
+
+    private ByteBuffer doGet() {
         if (handedOut < budget) {
             handedOut++;
             return allocate();
         }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+                    + ", Requested = 1");
+        }
         return null;
     }
 
@@ -61,11 +86,15 @@ public class ConcurrentFramePool {
         return budget - handedOut;
     }
 
-    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+    private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
+        // Subscribers have higher priority
         if (bufferSize % defaultFrameSize != 0) {
             throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
         }
         int multiplier = bufferSize / defaultFrameSize;
+        if (multiplier > budget) {
+            throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+        }
         if (handedOut + multiplier <= budget) {
             handedOut += multiplier;
             ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
@@ -76,9 +105,26 @@ public class ConcurrentFramePool {
                 created += multiplier;
                 return ByteBuffer.allocate(bufferSize);
             }
-            return largeFramesPool.poll();
+            ByteBuffer buffer = largeFramesPool.poll();
+            buffer.clear();
+            return buffer;
         }
         // Not enough budget
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+                    + ", Requested = " + multiplier);
+        }
+        return null;
+    }
+
+    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+        if (subscribers.isEmpty()) {
+            return doGet(bufferSize);
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+                    + subscribers.size());
+        }
         return null;
     }
 
@@ -121,7 +167,9 @@ public class ConcurrentFramePool {
             created++;
             return ByteBuffer.allocate(defaultFrameSize);
         } else {
-            return pool.pop();
+            ByteBuffer buffer = pool.pop();
+            buffer.clear();
+            return buffer;
         }
     }
 
@@ -150,6 +198,9 @@ public class ConcurrentFramePool {
     public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
         int multiples = buffer.capacity() / defaultFrameSize;
         handedOut -= multiples;
+        if (DEBUG) {
+            LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
+        }
         if (multiples == 1) {
             pool.add(buffer);
         } else {
@@ -163,22 +214,48 @@ public class ConcurrentFramePool {
         // check subscribers
         while (!subscribers.isEmpty()) {
             FrameAction frameAction = subscribers.peek();
+            ByteBuffer freeBuffer;
             // check if we have enough and answer immediately.
             if (frameAction.getSize() == defaultFrameSize) {
-                buffer = get();
+                if (DEBUG) {
+                    LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
+                }
+                freeBuffer = doGet();
             } else {
-                buffer = get(frameAction.getSize());
+                if (DEBUG) {
+                    LOGGER.info("Attempting to callback a subscriber that requested "
+                            + frameAction.getSize() / defaultFrameSize + " frames");
+                }
+                freeBuffer = doGet(frameAction.getSize());
             }
-            if (buffer != null) {
+            if (freeBuffer != null) {
+                int handedOutBeforeCall = handedOut;
                 try {
-                    frameAction.call(buffer);
+                    frameAction.call(freeBuffer);
+                } catch (Exception e) {
+                    LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+                    // TODO(amoudi): Add test cases and get rid of recursion
+                    if (handedOut == handedOutBeforeCall) {
+                        release(freeBuffer);
+                    }
+                    throw e;
                 } finally {
                     subscribers.remove();
+                    if (DEBUG) {
+                        LOGGER.info(
+                                "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
+                    }
                 }
             } else {
+                if (DEBUG) {
+                    LOGGER.info("Failed to allocate requested frames");
+                }
                 break;
             }
         }
+        if (DEBUG) {
+            LOGGER.info(subscribers.size() + " remaining subscribers");
+        }
     }
 
     public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
@@ -187,18 +264,30 @@ public class ConcurrentFramePool {
             ByteBuffer buffer;
             // check if we have enough and answer immediately.
             if (frameAction.getSize() == defaultFrameSize) {
-                buffer = get();
+                buffer = doGet();
             } else {
-                buffer = get(frameAction.getSize());
+                buffer = doGet(frameAction.getSize());
             }
             if (buffer != null) {
                 frameAction.call(buffer);
                 // There is no need to subscribe. perform action and return false
                 return false;
             }
+        } else {
+            int multiplier = frameAction.getSize() / defaultFrameSize;
+            if (multiplier > budget) {
+                throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+            }
         }
         // none of the above, add to subscribers and return true
         subscribers.add(frameAction);
         return true;
     }
+
+    /*
+     * For unit testing purposes
+     */
+    public Collection<FrameAction> getSubscribers() {
+        return subscribers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
index f888542..18d4cff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -35,11 +35,11 @@ public class FeedRuntimeId implements Serializable {
     private final String targetId;
     private final int hashCode;
 
-    public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String operandId) {
+    public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) {
         this.feedId = feedId;
         this.runtimeType = runtimeType;
         this.partition = partition;
-        this.targetId = operandId;
+        this.targetId = targetId;
         this.hashCode = toString().hashCode();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
new file mode 100644
index 0000000..444d8a5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class ConcurrentFramePoolUnitTest extends TestCase {
+
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int NUM_FRAMES = 2048;
+    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
+    private static final int NUM_THREADS = 8;
+    private static final int MAX_SIZE = 52;
+    private static final double RELEASE_PROBABILITY = 0.20;
+    private volatile static HyracksDataException cause = null;
+
+    public ConcurrentFramePoolUnitTest(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(ConcurrentFramePoolUnitTest.class);
+    }
+
+    @org.junit.Test
+    public void testMemoryManager() {
+        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+        Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+        ConcurrentFramePool fmm =
+                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+        int i = 0;
+        while (fmm.get() != null) {
+            i++;
+        }
+        Assert.assertEquals(i, NUM_FRAMES);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testConcurrentMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (FixedSizeAllocator allocator : runners) {
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            Random random = new Random();
+            int i = 0;
+            int req;
+            while (true) {
+                req = random.nextInt(MAX_SIZE) + 1;
+                if (req == 1) {
+                    if (fmm.get() != null) {
+                        i += 1;
+                    } else {
+                        break;
+                    }
+                } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+                    i += req;
+                } else {
+                    break;
+                }
+            }
+
+            Assert.assertEquals(i <= NUM_FRAMES, true);
+            Assert.assertEquals(i + req > NUM_FRAMES, true);
+            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testConcurrentVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+
+            VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int allocated = 0;
+            for (int i = 0; i < threads.length; i++) {
+                if (runners[i].cause() != null) {
+                    runners[i].cause().printStackTrace();
+                    Assert.fail(runners[i].cause().getMessage());
+                }
+                allocated += runners[i].getAllocated();
+            }
+            Assert.assertEquals(allocated <= NUM_FRAMES, true);
+            for (int i = 0; i < threads.length; i++) {
+                Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true);
+            }
+            Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testAcquireReleaseMemoryManager() throws HyracksDataException {
+        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+        Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+        ConcurrentFramePool fmm =
+                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+        Random random = new Random();
+        ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+        while (true) {
+            if (random.nextDouble() < RELEASE_PROBABILITY) {
+                if (!stack.isEmpty()) {
+                    fmm.release(stack.pop());
+                }
+            } else {
+                ByteBuffer buffer = fmm.get();
+                if (buffer == null) {
+                    break;
+                } else {
+                    stack.push(buffer);
+                }
+            }
+        }
+        Assert.assertEquals(stack.size(), NUM_FRAMES);
+        Assert.assertEquals(fmm.remaining(), 0);
+        for (ByteBuffer buffer : stack) {
+            fmm.release(buffer);
+        }
+        stack.clear();
+        Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testConcurrentAcquireReleaseMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (FixedSizeGoodAllocator allocator : runners) {
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testAcquireReleaseVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            Random random = new Random();
+            ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+            int i = 0;
+            int req;
+            while (true) {
+                // release
+                if (random.nextDouble() < RELEASE_PROBABILITY) {
+                    if (!stack.isEmpty()) {
+                        ByteBuffer buffer = stack.pop();
+                        i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+                        fmm.release(buffer);
+                    }
+                } else {
+                    // acquire
+                    req = random.nextInt(MAX_SIZE) + 1;
+                    if (req == 1) {
+                        ByteBuffer buffer = fmm.get();
+                        if (buffer != null) {
+                            stack.push(buffer);
+                            i += 1;
+                        } else {
+                            break;
+                        }
+                    } else {
+                        ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+                        if (buffer != null) {
+                            stack.push(buffer);
+                            i += req;
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            Assert.assertEquals(i <= NUM_FRAMES, true);
+            Assert.assertEquals(i + req > NUM_FRAMES, true);
+            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS];
+            Thread[] threads = new Thread[NUM_THREADS];
+            Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm));
+            for (int i = 0; i < threads.length; i++) {
+                threads[i] = new Thread(runners[i]);
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].start();
+            }
+            for (int i = 0; i < threads.length; i++) {
+                threads[i].join();
+            }
+            int i = 0;
+            for (VarSizeGoodAllocator allocator : runners) {
+                if (allocator.cause() != null) {
+                    allocator.cause().printStackTrace();
+                    Assert.fail(allocator.cause().getMessage());
+                }
+                i += allocator.getAllocated();
+            }
+            Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testFixedSizeSubscribtion() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            int i = 0;
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            while (!fmm.subscribe(frameAction)) {
+                buffers.put(frameAction.retrieve());
+                i++;
+            }
+            // One subscriber.
+            // Check that all frames have been consumed
+            Assert.assertEquals(i, NUM_FRAMES);
+            // Release a frame (That will be handed out to the subscriber)
+            fmm.release(buffers.take());
+            // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+            Assert.assertEquals(0, fmm.remaining());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testLargerThanBudgetRequests() {
+        HyracksDataException hde = null;
+        try {
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+            fmm.get(32 * DEFAULT_FRAME_SIZE);
+        } catch (HyracksDataException e) {
+            hde = e;
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNotNull(hde);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testLargerThanBudgetSubscribe() {
+        HyracksDataException hde = null;
+        try {
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            fmm.subscribe(frameAction);
+        } catch (HyracksDataException e) {
+            hde = e;
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNotNull(hde);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testgetWhileSubscribersExist() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            int i = 0;
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            while (!fmm.subscribe(frameAction)) {
+                buffers.put(frameAction.retrieve());
+                i++;
+            }
+            // One subscriber.
+            // Check that all frames have been consumed
+            Assert.assertEquals(i, NUM_FRAMES);
+            // Release a frame (That will be handed out to the subscriber)
+            fmm.release(buffers.take());
+            // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+            Assert.assertEquals(fmm.remaining(), 0);
+            buffers.put(frameAction.retrieve());
+            // Create another subscriber that takes frames of double the size
+            ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+            LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>();
+            FrameAction frameActionTimes2 = new FrameAction();
+            frameActionTimes2.setFrame(bufferTimes2);
+            Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
+            // release a small one
+            fmm.release(buffers.take());
+            Assert.assertEquals(fmm.remaining(), 1);
+            // Check that a small get fails
+            Assert.assertEquals(null, fmm.get());
+            // release another small one
+            fmm.release(buffers.take());
+            // Check that no small frames exists in the pool since subscriber request was satisfied
+            Assert.assertEquals(fmm.remaining(), 0);
+            buffersTimes2.add(frameActionTimes2.retrieve());
+            fmm.release(buffers);
+            fmm.release(bufferTimes2);
+            Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    /*
+     * Runnables used for unit tests
+     */
+    private class FixedSizeAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+
+        public FixedSizeAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        @Override
+        public void run() {
+            while (fmm.get() != null) {
+                allocated++;
+            }
+        }
+    }
+
+    private class FixedSizeGoodAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+        private final Random random = new Random();
+
+        public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return stack.size();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                if (random.nextDouble() < RELEASE_PROBABILITY) {
+                    if (!stack.isEmpty()) {
+                        try {
+                            fmm.release(stack.pop());
+                        } catch (HyracksDataException e) {
+                            e.printStackTrace();
+                            cause = e;
+                        }
+                    }
+                } else {
+                    ByteBuffer buffer = fmm.get();
+                    if (buffer == null) {
+                        break;
+                    } else {
+                        stack.push(buffer);
+                    }
+                }
+            }
+        }
+    }
+
+    private class VarSizeGoodAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+        private int req = 0;
+        private final Random random = new Random();
+        private Throwable cause;
+        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+
+        public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        public Throwable cause() {
+            return cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    if (random.nextDouble() < RELEASE_PROBABILITY) {
+                        if (!stack.isEmpty()) {
+                            ByteBuffer buffer = stack.pop();
+                            allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+                            fmm.release(buffer);
+                        }
+                    } else {
+                        req = random.nextInt(MAX_SIZE) + 1;
+                        if (req == 1) {
+                            ByteBuffer buffer = fmm.get();
+                            if (buffer != null) {
+                                stack.push(buffer);
+                                allocated += 1;
+                            } else {
+                                break;
+                            }
+                        } else {
+                            ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+                            if (buffer != null) {
+                                stack.push(buffer);
+                                allocated += req;
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+        }
+    }
+
+    private class VarSizeAllocator implements Runnable {
+        private final ConcurrentFramePool fmm;
+        private int allocated = 0;
+        private int req = 0;
+        private final Random random = new Random();
+        private Throwable cause;
+
+        public VarSizeAllocator(ConcurrentFramePool fmm) {
+            this.fmm = fmm;
+        }
+
+        public int getAllocated() {
+            return allocated;
+        }
+
+        public int getLastReq() {
+            return req;
+        }
+
+        public Throwable cause() {
+            return cause;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    req = random.nextInt(MAX_SIZE) + 1;
+                    if (req == 1) {
+                        if (fmm.get() != null) {
+                            allocated += 1;
+                        } else {
+                            break;
+                        }
+                    } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+                        allocated += req;
+                    } else {
+                        break;
+                    }
+                }
+            } catch (Throwable th) {
+                this.cause = th;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
deleted file mode 100644
index 8a6d1b6..0000000
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.test;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.junit.Assert;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-@RunWith(PowerMockRunner.class)
-public class FeedMemoryManagerUnitTest extends TestCase {
-
-    private static final int DEFAULT_FRAME_SIZE = 32768;
-    private static final int NUM_FRAMES = 2048;
-    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
-    private static final int NUM_THREADS = 8;
-    private static final int MAX_SIZE = 52;
-    private static final double RELEASE_PROBABILITY = 0.20;
-
-    public FeedMemoryManagerUnitTest(String testName) {
-        super(testName);
-    }
-
-    /**
-     * @return the suite of tests being tested
-     */
-    public static Test suite() {
-        return new TestSuite(FeedMemoryManagerUnitTest.class);
-    }
-
-    @org.junit.Test
-    public void testMemoryManager() {
-        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-        Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm =
-                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-        int i = 0;
-        while (fmm.get() != null) {
-            i++;
-        }
-        Assert.assertEquals(i, NUM_FRAMES);
-    }
-
-    @org.junit.Test
-    public void testConcurrentMemoryManager() {
-        try {
-            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int i = 0;
-            for (FixedSizeAllocator allocator : runners) {
-                i += allocator.getAllocated();
-            }
-            Assert.assertEquals(NUM_FRAMES, i);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-    }
-
-    @org.junit.Test
-    public void testVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            Random random = new Random();
-            int i = 0;
-            int req;
-            while (true) {
-                req = random.nextInt(MAX_SIZE) + 1;
-                if (req == 1) {
-                    if (fmm.get() != null) {
-                        i += 1;
-                    } else {
-                        break;
-                    }
-                } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
-                    i += req;
-                } else {
-                    break;
-                }
-            }
-
-            Assert.assertEquals(i <= NUM_FRAMES, true);
-            Assert.assertEquals(i + req > NUM_FRAMES, true);
-            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-    }
-
-    @org.junit.Test
-    public void testConcurrentVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-
-            VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int allocated = 0;
-            for (int i = 0; i < threads.length; i++) {
-                if (runners[i].cause() != null) {
-                    runners[i].cause().printStackTrace();
-                    Assert.fail(runners[i].cause().getMessage());
-                }
-                allocated += runners[i].getAllocated();
-            }
-            Assert.assertEquals(allocated <= NUM_FRAMES, true);
-            for (int i = 0; i < threads.length; i++) {
-                Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true);
-            }
-            Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-    }
-
-    @org.junit.Test
-    public void testAcquireReleaseMemoryManager() throws HyracksDataException {
-        AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-        Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm =
-                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-        Random random = new Random();
-        ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-        while (true) {
-            if (random.nextDouble() < RELEASE_PROBABILITY) {
-                if (!stack.isEmpty()) {
-                    fmm.release(stack.pop());
-                }
-            } else {
-                ByteBuffer buffer = fmm.get();
-                if (buffer == null) {
-                    break;
-                } else {
-                    stack.push(buffer);
-                }
-            }
-        }
-        Assert.assertEquals(stack.size(), NUM_FRAMES);
-        Assert.assertEquals(fmm.remaining(), 0);
-        for (ByteBuffer buffer : stack) {
-            fmm.release(buffer);
-        }
-        stack.clear();
-        Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
-    }
-
-    @org.junit.Test
-    public void testConcurrentAcquireReleaseMemoryManager() {
-        try {
-            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int i = 0;
-            for (FixedSizeGoodAllocator allocator : runners) {
-                i += allocator.getAllocated();
-            }
-            Assert.assertEquals(NUM_FRAMES, i);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-    }
-
-    @org.junit.Test
-    public void testAcquireReleaseVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            Random random = new Random();
-            ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-            int i = 0;
-            int req;
-            while (true) {
-                // release
-                if (random.nextDouble() < RELEASE_PROBABILITY) {
-                    if (!stack.isEmpty()) {
-                        ByteBuffer buffer = stack.pop();
-                        i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
-                        fmm.release(buffer);
-                    }
-                } else {
-                    // acquire
-                    req = random.nextInt(MAX_SIZE) + 1;
-                    if (req == 1) {
-                        ByteBuffer buffer = fmm.get();
-                        if (buffer != null) {
-                            stack.push(buffer);
-                            i += 1;
-                        } else {
-                            break;
-                        }
-                    } else {
-                        ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
-                        if (buffer != null) {
-                            stack.push(buffer);
-                            i += req;
-                        } else {
-                            break;
-                        }
-                    }
-                }
-            }
-
-            Assert.assertEquals(i <= NUM_FRAMES, true);
-            Assert.assertEquals(i + req > NUM_FRAMES, true);
-            Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-    }
-
-    @org.junit.Test
-    public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
-        try {
-            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
-            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-            VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS];
-            Thread[] threads = new Thread[NUM_THREADS];
-            Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm));
-            for (int i = 0; i < threads.length; i++) {
-                threads[i] = new Thread(runners[i]);
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].start();
-            }
-            for (int i = 0; i < threads.length; i++) {
-                threads[i].join();
-            }
-            int i = 0;
-            for (VarSizeGoodAllocator allocator : runners) {
-                if (allocator.cause() != null) {
-                    allocator.cause().printStackTrace();
-                    Assert.fail(allocator.cause().getMessage());
-                }
-                i += allocator.getAllocated();
-            }
-            Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
-        } catch (Throwable th) {
-            th.printStackTrace();
-            Assert.fail(th.getMessage());
-        }
-    }
-
-    /*
-     * Runnables used for unit tests
-     */
-    private class FixedSizeAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private int allocated = 0;
-
-        public FixedSizeAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return allocated;
-        }
-
-        @Override
-        public void run() {
-            while (fmm.get() != null) {
-                allocated++;
-            }
-        }
-    }
-
-    private class FixedSizeGoodAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-        private final Random random = new Random();
-
-        public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return stack.size();
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                if (random.nextDouble() < RELEASE_PROBABILITY) {
-                    if (!stack.isEmpty()) {
-                        try {
-                            fmm.release(stack.pop());
-                        } catch (HyracksDataException e) {
-                            Assert.fail();
-                        }
-                    }
-                } else {
-                    ByteBuffer buffer = fmm.get();
-                    if (buffer == null) {
-                        break;
-                    } else {
-                        stack.push(buffer);
-                    }
-                }
-            }
-        }
-    }
-
-    private class VarSizeGoodAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private int allocated = 0;
-        private int req = 0;
-        private final Random random = new Random();
-        private Throwable cause;
-        private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-
-        public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return allocated;
-        }
-
-        public Throwable cause() {
-            return cause;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    if (random.nextDouble() < RELEASE_PROBABILITY) {
-                        if (!stack.isEmpty()) {
-                            ByteBuffer buffer = stack.pop();
-                            allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
-                            fmm.release(buffer);
-                        }
-                    } else {
-                        req = random.nextInt(MAX_SIZE) + 1;
-                        if (req == 1) {
-                            ByteBuffer buffer = fmm.get();
-                            if (buffer != null) {
-                                stack.push(buffer);
-                                allocated += 1;
-                            } else {
-                                break;
-                            }
-                        } else {
-                            ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
-                            if (buffer != null) {
-                                stack.push(buffer);
-                                allocated += req;
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-                }
-            } catch (Throwable th) {
-                this.cause = th;
-            }
-        }
-    }
-
-    private class VarSizeAllocator implements Runnable {
-        private final ConcurrentFramePool fmm;
-        private int allocated = 0;
-        private int req = 0;
-        private final Random random = new Random();
-        private Throwable cause;
-
-        public VarSizeAllocator(ConcurrentFramePool fmm) {
-            this.fmm = fmm;
-        }
-
-        public int getAllocated() {
-            return allocated;
-        }
-
-        public int getLastReq() {
-            return req;
-        }
-
-        public Throwable cause() {
-            return cause;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    req = random.nextInt(MAX_SIZE) + 1;
-                    if (req == 1) {
-                        if (fmm.get() != null) {
-                            allocated += 1;
-                        } else {
-                            break;
-                        }
-                    } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
-                        allocated += req;
-                    } else {
-                        break;
-                    }
-                }
-            } catch (Throwable th) {
-                this.cause = th;
-            }
-        }
-    }
-}