You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/05/19 21:56:37 UTC
[2/2] incubator-asterixdb git commit: Add Unit Tests for Feed Runtime
Input Handler
Add Unit Tests for Feed Runtime Input Handler
Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55
Reviewed-on: https://asterix-gerrit.ics.uci.edu/866
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mi...@couchbase.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/803a3a2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/803a3a2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/803a3a2f
Branch: refs/heads/master
Commit: 803a3a2faeb82eff07d821e137806c878366908e
Parents: f815395
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Thu May 19 14:26:09 2016 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Thu May 19 14:55:35 2016 -0700
----------------------------------------------------------------------
.../feed/dataflow/FeedRuntimeInputHandler.java | 222 +++++-
.../external/feed/dataflow/FrameAction.java | 42 +-
.../external/feed/dataflow/FrameSpiller.java | 13 +-
.../feed/management/ConcurrentFramePool.java | 107 ++-
.../external/feed/runtime/FeedRuntimeId.java | 4 +-
.../feed/test/ConcurrentFramePoolUnitTest.java | 613 ++++++++++++++
.../feed/test/FeedMemoryManagerUnitTest.java | 482 -----------
.../external/feed/test/InputHandlerTest.java | 794 +++++++++++++++++++
hyracks-fullstack/hyracks/hyracks-api/pom.xml | 35 +-
.../hyracks/api/test/CountAndThrowError.java | 42 +
.../api/test/CountAndThrowException.java | 42 +
.../apache/hyracks/api/test/CountAnswer.java | 46 ++
.../hyracks/api/test/FrameWriterTestUtils.java | 65 ++
.../api/test/TestControlledFrameWriter.java | 82 ++
.../hyracks/api/test/TestFrameWriter.java | 173 ++++
.../hyracks/hyracks-storage-am-btree/pom.xml | 7 +
.../storage/am/btree/test/FramewriterTest.java | 58 +-
17 files changed, 2222 insertions(+), 605 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3920a03..d201a6a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
/**
- * TODO: Add unit test cases for this class
+ * TODO: Add Failure cases unit tests for this class
* Provides for error-handling and input-side buffering for a feed runtime.
* .............______.............
* ............|......|............
@@ -48,8 +48,9 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperato
public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
- private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
+ private static final boolean DEBUG = false;
+ private final Object mutex = new Object();
private final FeedExceptionHandler exceptionHandler;
private final FrameSpiller spiller;
private final FeedPolicyAccessor fpa;
@@ -58,15 +59,16 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
private final FrameTransporter consumer;
private final Thread consumerThread;
private final LinkedBlockingDeque<ByteBuffer> inbox;
- private final ConcurrentFramePool memoryManager;
+ private final ConcurrentFramePool framePool;
private Mode mode = Mode.PROCESS;
+ private int total = 0;
private int numDiscarded = 0;
private int numSpilled = 0;
private int numProcessedInMemory = 0;
private int numStalled = 0;
public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool feedMemoryManager)
+ IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
throws HyracksDataException {
this.writer = writer;
this.spiller =
@@ -76,13 +78,13 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
fpa.getMaxSpillOnDisk());
this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
this.fpa = fpa;
- this.memoryManager = feedMemoryManager;
+ this.framePool = framePool;
this.inbox = new LinkedBlockingDeque<>();
this.consumer = new FrameTransporter();
- this.consumerThread = new Thread();
+ this.consumerThread = new Thread(consumer);
this.consumerThread.start();
this.initialFrameSize = ctx.getInitialFrameSize();
- this.frameAction = new FrameAction(inbox);
+ this.frameAction = new FrameAction();
}
@Override
@@ -101,15 +103,20 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
@Override
public void close() throws HyracksDataException {
- inbox.add(POISON_PILL);
- notify();
+ consumer.poison();
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
try {
consumerThread.join();
} catch (InterruptedException e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
}
try {
- memoryManager.release(inbox);
+ framePool.release(inbox);
} catch (Throwable th) {
LOGGER.log(Level.WARNING, th.getMessage(), th);
}
@@ -124,9 +131,13 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
@Override
public void nextFrame(ByteBuffer frame) throws HyracksDataException {
try {
+ total++;
if (consumer.cause() != null) {
throw consumer.cause();
}
+ if (DEBUG) {
+ LOGGER.info("nextFrame() called. inputHandler is in mode: " + mode.toString());
+ }
switch (mode) {
case PROCESS:
process(frame);
@@ -148,25 +159,42 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
}
}
+ // For unit testing purposes
+ public int framesOnDisk() {
+ return spiller.remaining();
+ }
+
private ByteBuffer getFreeBuffer(int frameSize) throws HyracksDataException {
int numFrames = frameSize / initialFrameSize;
if (numFrames == 1) {
- return memoryManager.get();
+ return framePool.get();
} else {
- return memoryManager.get(frameSize);
+ return framePool.get(frameSize);
}
}
private void discard(ByteBuffer frame) throws HyracksDataException {
+ if (DEBUG) {
+ LOGGER.info("starting discard(frame)");
+ }
if (fpa.spillToDiskOnCongestion()) {
+ if (DEBUG) {
+ LOGGER.info("Spilling to disk is enabled. Will try that");
+ }
if (spiller.spill(frame)) {
numSpilled++;
mode = Mode.SPILL;
return;
}
} else {
+ if (DEBUG) {
+ LOGGER.info("Spilling to disk is disabled. Will try to get a buffer");
+ }
ByteBuffer next = getFreeBuffer(frame.capacity());
if (next != null) {
+ if (DEBUG) {
+ LOGGER.info("Was able to get a buffer");
+ }
numProcessedInMemory++;
next.put(frame);
inbox.offer(next);
@@ -174,102 +202,168 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
return;
}
}
- numDiscarded++;
+ if (((numDiscarded + 1.0) / total) > fpa.getMaxFractionDiscard()) {
+ if (DEBUG) {
+ LOGGER.info("in discard(frame). Discard allowance has been consumed. --> Stalling");
+ }
+ stall(frame);
+ } else {
+ if (DEBUG) {
+ LOGGER.info("in discard(frame). So far, I have discarded " + numDiscarded);
+ }
+ numDiscarded++;
+ }
}
- private synchronized void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+ private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
if (fpa.spillToDiskOnCongestion()) {
mode = Mode.SPILL;
spiller.open();
spill(frame);
} else {
+ if (DEBUG) {
+ LOGGER.info("Spilling is disabled --> discardOrStall(frame)");
+ }
discardOrStall(frame);
}
}
private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
if (fpa.discardOnCongestion()) {
- numDiscarded++;
mode = Mode.DISCARD;
discard(frame);
} else {
+ if (DEBUG) {
+ LOGGER.info("Discard is disabled --> stall(frame)");
+ }
stall(frame);
}
}
private void stall(ByteBuffer frame) throws HyracksDataException {
try {
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+ }
numStalled++;
// If spilling is enabled, we wait on the spiller
if (fpa.spillToDiskOnCongestion()) {
- synchronized (spiller) {
- while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
- spiller.wait();
- }
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
}
+ waitforSpillSpace();
spiller.spill(frame);
- synchronized (this) {
- notify();
+ numSpilled++;
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
}
return;
}
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
+ }
// Spilling is disabled, we subscribe to feedMemoryManager
frameAction.setFrame(frame);
- synchronized (frameAction) {
- if (memoryManager.subscribe(frameAction)) {
- frameAction.wait();
- }
+ framePool.subscribe(frameAction);
+ ByteBuffer temp = frameAction.retrieve();
+ inbox.put(temp);
+ numProcessedInMemory++;
+ if (DEBUG) {
+ LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
}
- synchronized (this) {
- notify();
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
}
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
+ private void waitforSpillSpace() throws InterruptedException {
+ synchronized (spiller) {
+ while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than "
+ + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = "
+ + spiller.usedBudget());
+ }
+ spiller.wait();
+ }
+ }
+ }
+
private void process(ByteBuffer frame) throws HyracksDataException {
- // Get a page from
- ByteBuffer next = getFreeBuffer(frame.capacity());
+ // Get a page from frame pool
+ ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null;
if (next != null) {
+ // Got a page from memory pool
numProcessedInMemory++;
next.put(frame);
- inbox.offer(next);
- if (inbox.size() == 1) {
- synchronized (this) {
- notify();
- }
+ try {
+ inbox.put(next);
+ notifyMemoryConsumer();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
} else {
- // out of memory. we switch to next mode as per policy -- synchronized method
+ if (DEBUG) {
+ LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
+ }
+ // Out of memory. we switch to next mode as per policy
exitProcessState(frame);
}
}
+ private void notifyMemoryConsumer() {
+ if (inbox.size() == 1) {
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
+ }
+ }
+
private void spill(ByteBuffer frame) throws HyracksDataException {
if (spiller.switchToMemory()) {
- synchronized (this) {
+ synchronized (mutex) {
// Check if there is memory
- ByteBuffer next = getFreeBuffer(frame.capacity());
+ ByteBuffer next = null;
+ if (frame.capacity() <= framePool.getMaxFrameSize()) {
+ next = getFreeBuffer(frame.capacity());
+ }
if (next != null) {
spiller.close();
numProcessedInMemory++;
next.put(frame);
inbox.offer(next);
+ notifyMemoryConsumer();
mode = Mode.PROCESS;
} else {
- // spill. This will always succeed since spilled = 0 (must verify that budget can't be 0)
+ // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
spiller.spill(frame);
numSpilled++;
- notify();
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
}
}
} else {
// try to spill. If failed switch to either discard or stall
if (spiller.spill(frame)) {
+ notifyDiskConsumer();
numSpilled++;
} else {
if (fpa.discardOnCongestion()) {
+ mode = Mode.DISCARD;
discard(frame);
} else {
stall(frame);
@@ -278,11 +372,22 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
}
}
+ private void notifyDiskConsumer() {
+ if (spiller.remaining() == 1) {
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
+ }
+ }
+
public Mode getMode() {
return mode;
}
- public synchronized void setMode(Mode mode) {
+ public void setMode(Mode mode) {
this.mode = mode;
}
@@ -311,15 +416,22 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
private class FrameTransporter implements Runnable {
private volatile Throwable cause;
+ private int consumed = 0;
+ private boolean poisoned = false;
public Throwable cause() {
return cause;
}
+ public void poison() {
+ poisoned = true;
+ }
+
private Throwable consume(ByteBuffer frame) {
while (frame != null) {
try {
writer.nextFrame(frame);
+ consumed++;
frame = null;
} catch (HyracksDataException e) {
// It is fine to catch throwable here since this thread is always expected to terminate gracefully
@@ -340,7 +452,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
public void run() {
try {
ByteBuffer frame = inbox.poll();
- while (frame != POISON_PILL) {
+ while (true) {
if (frame != null) {
try {
if (consume(frame) != null) {
@@ -348,7 +460,7 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
}
} finally {
// Done with frame.
- memoryManager.release(frame);
+ framePool.release(frame);
}
}
frame = inbox.poll();
@@ -366,13 +478,22 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
writer.flush();
// At this point. We consumed all memory and spilled
// We can't assume the next will be in memory. what if there is 0 memory?
- synchronized (FeedRuntimeInputHandler.this) {
+ synchronized (mutex) {
frame = inbox.poll();
if (frame == null) {
// Nothing in memory
if (spiller.switchToMemory()) {
+ if (poisoned) {
+ break;
+ }
+ if (DEBUG) {
+ LOGGER.info("Consumer is going to sleep");
+ }
// Nothing in disk
- FeedRuntimeInputHandler.this.wait();
+ mutex.wait();
+ if (DEBUG) {
+ LOGGER.info("Consumer is waking up");
+ }
}
}
}
@@ -383,5 +504,18 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
}
// cleanup will always be done through the close() call
}
+
+ @Override
+ public String toString() {
+ return "consumed: " + consumed;
+ }
+ }
+
+ public int getTotal() {
+ return total;
+ }
+
+ public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() {
+ return inbox;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
index 4a2120a..f02b4aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
@@ -19,29 +19,45 @@
package org.apache.asterix.external.feed.dataflow;
import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingDeque;
-import rx.functions.Action1;
+import org.apache.log4j.Logger;
-public class FrameAction implements Action1<ByteBuffer> {
- private final LinkedBlockingDeque<ByteBuffer> inbox;
+public class FrameAction {
+ private static final boolean DEBUG = false;
+ private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
+ private ByteBuffer allocated;
private ByteBuffer frame;
- public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) {
- this.inbox = inbox;
- }
-
- @Override
public void call(ByteBuffer freeFrame) {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: My subscription is being answered");
+ }
freeFrame.put(frame);
- inbox.add(freeFrame);
synchronized (this) {
- notify();
+ allocated = freeFrame;
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Waking up waiting threads");
+ }
+ notifyAll();
}
}
- public ByteBuffer getFrame() {
- return frame;
+ public synchronized ByteBuffer retrieve() throws InterruptedException {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Attempting to get allocated buffer");
+ }
+ while (allocated == null) {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
+ }
+ wait();
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Awoken Up");
+ }
+ }
+ ByteBuffer temp = allocated;
+ allocated = null;
+ return temp;
}
public void setFrame(ByteBuffer frame) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index f0d226a..a2f19bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -44,6 +44,7 @@ import org.apache.log4j.Logger;
public class FrameSpiller {
private static final Logger LOGGER = Logger.getLogger(FrameSpiller.class.getName());
private static final int FRAMES_PER_FILE = 1024;
+ public static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
private final String fileNamePrefix;
private final ArrayDeque<File> files = new ArrayDeque<>();
@@ -88,11 +89,11 @@ public class FrameSpiller {
}
public synchronized ByteBuffer next() throws HyracksDataException {
+ frame.reset();
+ if (totalReadCount == totalWriteCount) {
+ return null;
+ }
try {
- frame.reset();
- if (totalReadCount == totalWriteCount) {
- return null;
- }
if (currentReadFile == null) {
if (!files.isEmpty()) {
currentReadFile = files.pop();
@@ -126,6 +127,10 @@ public class FrameSpiller {
return frame.getBuffer();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ synchronized (this) {
+ notify();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
index 25aa86a..e5543d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -28,10 +28,15 @@ import java.util.Map.Entry;
import org.apache.asterix.external.feed.dataflow.FrameAction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
public class ConcurrentFramePool {
+ private static final boolean DEBUG = false;
private static final String ERROR_INVALID_FRAME_SIZE =
"The size should be an integral multiple of the default frame size";
+ private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+ "The requested frame size must not be greater than the allocated budget";
+ private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
private final String nodeId;
private final int budget;
private final int defaultFrameSize;
@@ -49,11 +54,31 @@ public class ConcurrentFramePool {
this.largeFramesPools = new HashMap<>();
}
+ public int getMaxFrameSize() {
+ return budget * defaultFrameSize;
+ }
+
public synchronized ByteBuffer get() {
+ // Subscribers have higher priority
+ if (subscribers.isEmpty()) {
+ return doGet();
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
+ return null;
+ }
+
+ private ByteBuffer doGet() {
if (handedOut < budget) {
handedOut++;
return allocate();
}
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+ + ", Requested = 1");
+ }
return null;
}
@@ -61,11 +86,15 @@ public class ConcurrentFramePool {
return budget - handedOut;
}
- public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+ private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
+ // Subscribers have higher priority
if (bufferSize % defaultFrameSize != 0) {
throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
}
int multiplier = bufferSize / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
if (handedOut + multiplier <= budget) {
handedOut += multiplier;
ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
@@ -76,9 +105,26 @@ public class ConcurrentFramePool {
created += multiplier;
return ByteBuffer.allocate(bufferSize);
}
- return largeFramesPool.poll();
+ ByteBuffer buffer = largeFramesPool.poll();
+ buffer.clear();
+ return buffer;
}
// Not enough budget
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+ + ", Requested = " + multiplier);
+ }
+ return null;
+ }
+
+ public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+ if (subscribers.isEmpty()) {
+ return doGet(bufferSize);
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
return null;
}
@@ -121,7 +167,9 @@ public class ConcurrentFramePool {
created++;
return ByteBuffer.allocate(defaultFrameSize);
} else {
- return pool.pop();
+ ByteBuffer buffer = pool.pop();
+ buffer.clear();
+ return buffer;
}
}
@@ -150,6 +198,9 @@ public class ConcurrentFramePool {
public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
int multiples = buffer.capacity() / defaultFrameSize;
handedOut -= multiples;
+ if (DEBUG) {
+ LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
+ }
if (multiples == 1) {
pool.add(buffer);
} else {
@@ -163,22 +214,48 @@ public class ConcurrentFramePool {
// check subscribers
while (!subscribers.isEmpty()) {
FrameAction frameAction = subscribers.peek();
+ ByteBuffer freeBuffer;
// check if we have enough and answer immediately.
if (frameAction.getSize() == defaultFrameSize) {
- buffer = get();
+ if (DEBUG) {
+ LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
+ }
+ freeBuffer = doGet();
} else {
- buffer = get(frameAction.getSize());
+ if (DEBUG) {
+ LOGGER.info("Attempting to callback a subscriber that requested "
+ + frameAction.getSize() / defaultFrameSize + " frames");
+ }
+ freeBuffer = doGet(frameAction.getSize());
}
- if (buffer != null) {
+ if (freeBuffer != null) {
+ int handedOutBeforeCall = handedOut;
try {
- frameAction.call(buffer);
+ frameAction.call(freeBuffer);
+ } catch (Exception e) {
+ LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+ // TODO(amoudi): Add test cases and get rid of recursion
+ if (handedOut == handedOutBeforeCall) {
+ release(freeBuffer);
+ }
+ throw e;
} finally {
subscribers.remove();
+ if (DEBUG) {
+ LOGGER.info(
+ "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
+ }
}
} else {
+ if (DEBUG) {
+ LOGGER.info("Failed to allocate requested frames");
+ }
break;
}
}
+ if (DEBUG) {
+ LOGGER.info(subscribers.size() + " remaining subscribers");
+ }
}
public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
@@ -187,18 +264,30 @@ public class ConcurrentFramePool {
ByteBuffer buffer;
// check if we have enough and answer immediately.
if (frameAction.getSize() == defaultFrameSize) {
- buffer = get();
+ buffer = doGet();
} else {
- buffer = get(frameAction.getSize());
+ buffer = doGet(frameAction.getSize());
}
if (buffer != null) {
frameAction.call(buffer);
// There is no need to subscribe. perform action and return false
return false;
}
+ } else {
+ int multiplier = frameAction.getSize() / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
}
// none of the above, add to subscribers and return true
subscribers.add(frameAction);
return true;
}
+
+ /*
+ * For unit testing purposes
+ */
+ public Collection<FrameAction> getSubscribers() {
+ return subscribers;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
index f888542..18d4cff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -35,11 +35,11 @@ public class FeedRuntimeId implements Serializable {
private final String targetId;
private final int hashCode;
- public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String operandId) {
+ public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) {
this.feedId = feedId;
this.runtimeType = runtimeType;
this.partition = partition;
- this.targetId = operandId;
+ this.targetId = targetId;
this.hashCode = toString().hashCode();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
new file mode 100644
index 0000000..444d8a5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class ConcurrentFramePoolUnitTest extends TestCase {
+
+ private static final int DEFAULT_FRAME_SIZE = 32768;
+ private static final int NUM_FRAMES = 2048;
+ private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
+ private static final int NUM_THREADS = 8;
+ private static final int MAX_SIZE = 52;
+ private static final double RELEASE_PROBABILITY = 0.20;
+ private volatile static HyracksDataException cause = null;
+
+ public ConcurrentFramePoolUnitTest(String testName) {
+ super(testName);
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite() {
+ return new TestSuite(ConcurrentFramePoolUnitTest.class);
+ }
+
+ @org.junit.Test
+ public void testMemoryManager() {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ while (fmm.get() != null) {
+ i++;
+ }
+ Assert.assertEquals(i, NUM_FRAMES);
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testConcurrentMemoryManager() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
+ Thread[] threads = new Thread[NUM_THREADS];
+ Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm));
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(runners[i]);
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+ int i = 0;
+ for (FixedSizeAllocator allocator : runners) {
+ i += allocator.getAllocated();
+ }
+ Assert.assertEquals(NUM_FRAMES, i);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testVarSizeMemoryManager() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ Random random = new Random();
+ int i = 0;
+ int req;
+ while (true) {
+ req = random.nextInt(MAX_SIZE) + 1;
+ if (req == 1) {
+ if (fmm.get() != null) {
+ i += 1;
+ } else {
+ break;
+ }
+ } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+ i += req;
+ } else {
+ break;
+ }
+ }
+
+ Assert.assertEquals(i <= NUM_FRAMES, true);
+ Assert.assertEquals(i + req > NUM_FRAMES, true);
+ Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testConcurrentVarSizeMemoryManager() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+
+ VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
+ Thread[] threads = new Thread[NUM_THREADS];
+ Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm));
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(runners[i]);
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+ int allocated = 0;
+ for (int i = 0; i < threads.length; i++) {
+ if (runners[i].cause() != null) {
+ runners[i].cause().printStackTrace();
+ Assert.fail(runners[i].cause().getMessage());
+ }
+ allocated += runners[i].getAllocated();
+ }
+ Assert.assertEquals(allocated <= NUM_FRAMES, true);
+ for (int i = 0; i < threads.length; i++) {
+ Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true);
+ }
+ Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testAcquireReleaseMemoryManager() throws HyracksDataException {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ Random random = new Random();
+ ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+ while (true) {
+ if (random.nextDouble() < RELEASE_PROBABILITY) {
+ if (!stack.isEmpty()) {
+ fmm.release(stack.pop());
+ }
+ } else {
+ ByteBuffer buffer = fmm.get();
+ if (buffer == null) {
+ break;
+ } else {
+ stack.push(buffer);
+ }
+ }
+ }
+ Assert.assertEquals(stack.size(), NUM_FRAMES);
+ Assert.assertEquals(fmm.remaining(), 0);
+ for (ByteBuffer buffer : stack) {
+ fmm.release(buffer);
+ }
+ stack.clear();
+ Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testConcurrentAcquireReleaseMemoryManager() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS];
+ Thread[] threads = new Thread[NUM_THREADS];
+ Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm));
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(runners[i]);
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+ int i = 0;
+ for (FixedSizeGoodAllocator allocator : runners) {
+ i += allocator.getAllocated();
+ }
+ Assert.assertEquals(NUM_FRAMES, i);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testAcquireReleaseVarSizeMemoryManager() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ Random random = new Random();
+ ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+ int i = 0;
+ int req;
+ while (true) {
+ // release
+ if (random.nextDouble() < RELEASE_PROBABILITY) {
+ if (!stack.isEmpty()) {
+ ByteBuffer buffer = stack.pop();
+ i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+ fmm.release(buffer);
+ }
+ } else {
+ // acquire
+ req = random.nextInt(MAX_SIZE) + 1;
+ if (req == 1) {
+ ByteBuffer buffer = fmm.get();
+ if (buffer != null) {
+ stack.push(buffer);
+ i += 1;
+ } else {
+ break;
+ }
+ } else {
+ ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+ if (buffer != null) {
+ stack.push(buffer);
+ i += req;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ Assert.assertEquals(i <= NUM_FRAMES, true);
+ Assert.assertEquals(i + req > NUM_FRAMES, true);
+ Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ @org.junit.Test
+ public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS];
+ Thread[] threads = new Thread[NUM_THREADS];
+ Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm));
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(runners[i]);
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ }
+ int i = 0;
+ for (VarSizeGoodAllocator allocator : runners) {
+ if (allocator.cause() != null) {
+ allocator.cause().printStackTrace();
+ Assert.fail(allocator.cause().getMessage());
+ }
+ i += allocator.getAllocated();
+ }
+ Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ @org.junit.Test
+ public void testFixedSizeSubscribtion() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ while (!fmm.subscribe(frameAction)) {
+ buffers.put(frameAction.retrieve());
+ i++;
+ }
+ // One subscriber.
+ // Check that all frames have been consumed
+ Assert.assertEquals(i, NUM_FRAMES);
+ // Release a frame (That will be handed out to the subscriber)
+ fmm.release(buffers.take());
+ // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+ Assert.assertEquals(0, fmm.remaining());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ @org.junit.Test
+ public void testLargerThanBudgetRequests() {
+ HyracksDataException hde = null;
+ try {
+ ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+ fmm.get(32 * DEFAULT_FRAME_SIZE);
+ } catch (HyracksDataException e) {
+ hde = e;
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNotNull(hde);
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testLargerThanBudgetSubscribe() {
+ HyracksDataException hde = null;
+ try {
+ ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ fmm.subscribe(frameAction);
+ } catch (HyracksDataException e) {
+ hde = e;
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNotNull(hde);
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testgetWhileSubscribersExist() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ while (!fmm.subscribe(frameAction)) {
+ buffers.put(frameAction.retrieve());
+ i++;
+ }
+ // One subscriber.
+ // Check that all frames have been consumed
+ Assert.assertEquals(i, NUM_FRAMES);
+ // Release a frame (That will be handed out to the subscriber)
+ fmm.release(buffers.take());
+ // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+ Assert.assertEquals(fmm.remaining(), 0);
+ buffers.put(frameAction.retrieve());
+ // Create another subscriber that takes frames of double the size
+ ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+ LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>();
+ FrameAction frameActionTimes2 = new FrameAction();
+ frameActionTimes2.setFrame(bufferTimes2);
+ Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
+ // release a small one
+ fmm.release(buffers.take());
+ Assert.assertEquals(fmm.remaining(), 1);
+ // Check that a small get fails
+ Assert.assertEquals(null, fmm.get());
+ // release another small one
+ fmm.release(buffers.take());
+ // Check that no small frames exists in the pool since subscriber request was satisfied
+ Assert.assertEquals(fmm.remaining(), 0);
+ buffersTimes2.add(frameActionTimes2.retrieve());
+ fmm.release(buffers);
+ fmm.release(bufferTimes2);
+ Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ /*
+ * Runnables used for unit tests
+ */
+ private class FixedSizeAllocator implements Runnable {
+ private final ConcurrentFramePool fmm;
+ private int allocated = 0;
+
+ public FixedSizeAllocator(ConcurrentFramePool fmm) {
+ this.fmm = fmm;
+ }
+
+ public int getAllocated() {
+ return allocated;
+ }
+
+ @Override
+ public void run() {
+ while (fmm.get() != null) {
+ allocated++;
+ }
+ }
+ }
+
+ private class FixedSizeGoodAllocator implements Runnable {
+ private final ConcurrentFramePool fmm;
+ private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+ private final Random random = new Random();
+
+ public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
+ this.fmm = fmm;
+ }
+
+ public int getAllocated() {
+ return stack.size();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (random.nextDouble() < RELEASE_PROBABILITY) {
+ if (!stack.isEmpty()) {
+ try {
+ fmm.release(stack.pop());
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ cause = e;
+ }
+ }
+ } else {
+ ByteBuffer buffer = fmm.get();
+ if (buffer == null) {
+ break;
+ } else {
+ stack.push(buffer);
+ }
+ }
+ }
+ }
+ }
+
+ private class VarSizeGoodAllocator implements Runnable {
+ private final ConcurrentFramePool fmm;
+ private int allocated = 0;
+ private int req = 0;
+ private final Random random = new Random();
+ private Throwable cause;
+ private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
+
+ public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
+ this.fmm = fmm;
+ }
+
+ public int getAllocated() {
+ return allocated;
+ }
+
+ public Throwable cause() {
+ return cause;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ if (random.nextDouble() < RELEASE_PROBABILITY) {
+ if (!stack.isEmpty()) {
+ ByteBuffer buffer = stack.pop();
+ allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
+ fmm.release(buffer);
+ }
+ } else {
+ req = random.nextInt(MAX_SIZE) + 1;
+ if (req == 1) {
+ ByteBuffer buffer = fmm.get();
+ if (buffer != null) {
+ stack.push(buffer);
+ allocated += 1;
+ } else {
+ break;
+ }
+ } else {
+ ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
+ if (buffer != null) {
+ stack.push(buffer);
+ allocated += req;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ } catch (Throwable th) {
+ this.cause = th;
+ }
+ }
+ }
+
+ private class VarSizeAllocator implements Runnable {
+ private final ConcurrentFramePool fmm;
+ private int allocated = 0;
+ private int req = 0;
+ private final Random random = new Random();
+ private Throwable cause;
+
+ public VarSizeAllocator(ConcurrentFramePool fmm) {
+ this.fmm = fmm;
+ }
+
+ public int getAllocated() {
+ return allocated;
+ }
+
+ public int getLastReq() {
+ return req;
+ }
+
+ public Throwable cause() {
+ return cause;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ req = random.nextInt(MAX_SIZE) + 1;
+ if (req == 1) {
+ if (fmm.get() != null) {
+ allocated += 1;
+ } else {
+ break;
+ }
+ } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
+ allocated += req;
+ } else {
+ break;
+ }
+ }
+ } catch (Throwable th) {
+ this.cause = th;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/803a3a2f/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
deleted file mode 100644
index 8a6d1b6..0000000
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.test;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.external.feed.management.ConcurrentFramePool;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.junit.Assert;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-@RunWith(PowerMockRunner.class)
-public class FeedMemoryManagerUnitTest extends TestCase {
-
- private static final int DEFAULT_FRAME_SIZE = 32768;
- private static final int NUM_FRAMES = 2048;
- private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
- private static final int NUM_THREADS = 8;
- private static final int MAX_SIZE = 52;
- private static final double RELEASE_PROBABILITY = 0.20;
-
- public FeedMemoryManagerUnitTest(String testName) {
- super(testName);
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite() {
- return new TestSuite(FeedMemoryManagerUnitTest.class);
- }
-
- @org.junit.Test
- public void testMemoryManager() {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- int i = 0;
- while (fmm.get() != null) {
- i++;
- }
- Assert.assertEquals(i, NUM_FRAMES);
- }
-
- @org.junit.Test
- public void testConcurrentMemoryManager() {
- try {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
- Thread[] threads = new Thread[NUM_THREADS];
- Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm));
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(runners[i]);
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].join();
- }
- int i = 0;
- for (FixedSizeAllocator allocator : runners) {
- i += allocator.getAllocated();
- }
- Assert.assertEquals(NUM_FRAMES, i);
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail(th.getMessage());
- }
- }
-
- @org.junit.Test
- public void testVarSizeMemoryManager() {
- try {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- Random random = new Random();
- int i = 0;
- int req;
- while (true) {
- req = random.nextInt(MAX_SIZE) + 1;
- if (req == 1) {
- if (fmm.get() != null) {
- i += 1;
- } else {
- break;
- }
- } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
- i += req;
- } else {
- break;
- }
- }
-
- Assert.assertEquals(i <= NUM_FRAMES, true);
- Assert.assertEquals(i + req > NUM_FRAMES, true);
- Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail(th.getMessage());
- }
- }
-
- @org.junit.Test
- public void testConcurrentVarSizeMemoryManager() {
- try {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
-
- VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
- Thread[] threads = new Thread[NUM_THREADS];
- Arrays.parallelSetAll(runners, (int i) -> new VarSizeAllocator(fmm));
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(runners[i]);
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].join();
- }
- int allocated = 0;
- for (int i = 0; i < threads.length; i++) {
- if (runners[i].cause() != null) {
- runners[i].cause().printStackTrace();
- Assert.fail(runners[i].cause().getMessage());
- }
- allocated += runners[i].getAllocated();
- }
- Assert.assertEquals(allocated <= NUM_FRAMES, true);
- for (int i = 0; i < threads.length; i++) {
- Assert.assertEquals(allocated + runners[i].getLastReq() > NUM_FRAMES, true);
- }
- Assert.assertEquals(allocated + fmm.remaining(), NUM_FRAMES);
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail(th.getMessage());
- }
- }
-
- @org.junit.Test
- public void testAcquireReleaseMemoryManager() throws HyracksDataException {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- Random random = new Random();
- ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
- while (true) {
- if (random.nextDouble() < RELEASE_PROBABILITY) {
- if (!stack.isEmpty()) {
- fmm.release(stack.pop());
- }
- } else {
- ByteBuffer buffer = fmm.get();
- if (buffer == null) {
- break;
- } else {
- stack.push(buffer);
- }
- }
- }
- Assert.assertEquals(stack.size(), NUM_FRAMES);
- Assert.assertEquals(fmm.remaining(), 0);
- for (ByteBuffer buffer : stack) {
- fmm.release(buffer);
- }
- stack.clear();
- Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
- }
-
- @org.junit.Test
- public void testConcurrentAcquireReleaseMemoryManager() {
- try {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS];
- Thread[] threads = new Thread[NUM_THREADS];
- Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm));
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(runners[i]);
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].join();
- }
- int i = 0;
- for (FixedSizeGoodAllocator allocator : runners) {
- i += allocator.getAllocated();
- }
- Assert.assertEquals(NUM_FRAMES, i);
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail(th.getMessage());
- }
- }
-
- @org.junit.Test
- public void testAcquireReleaseVarSizeMemoryManager() {
- try {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- Random random = new Random();
- ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
- int i = 0;
- int req;
- while (true) {
- // release
- if (random.nextDouble() < RELEASE_PROBABILITY) {
- if (!stack.isEmpty()) {
- ByteBuffer buffer = stack.pop();
- i -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
- fmm.release(buffer);
- }
- } else {
- // acquire
- req = random.nextInt(MAX_SIZE) + 1;
- if (req == 1) {
- ByteBuffer buffer = fmm.get();
- if (buffer != null) {
- stack.push(buffer);
- i += 1;
- } else {
- break;
- }
- } else {
- ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
- if (buffer != null) {
- stack.push(buffer);
- i += req;
- } else {
- break;
- }
- }
- }
- }
-
- Assert.assertEquals(i <= NUM_FRAMES, true);
- Assert.assertEquals(i + req > NUM_FRAMES, true);
- Assert.assertEquals(i + fmm.remaining(), NUM_FRAMES);
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail(th.getMessage());
- }
- }
-
- @org.junit.Test
- public void testConcurrentAcquireReleaseVarSizeMemoryManager() {
- try {
- AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
- Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
- ConcurrentFramePool fmm =
- new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
- VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS];
- Thread[] threads = new Thread[NUM_THREADS];
- Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm));
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new Thread(runners[i]);
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].start();
- }
- for (int i = 0; i < threads.length; i++) {
- threads[i].join();
- }
- int i = 0;
- for (VarSizeGoodAllocator allocator : runners) {
- if (allocator.cause() != null) {
- allocator.cause().printStackTrace();
- Assert.fail(allocator.cause().getMessage());
- }
- i += allocator.getAllocated();
- }
- Assert.assertEquals(NUM_FRAMES, i + fmm.remaining());
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail(th.getMessage());
- }
- }
-
- /*
- * Runnables used for unit tests
- */
- private class FixedSizeAllocator implements Runnable {
- private final ConcurrentFramePool fmm;
- private int allocated = 0;
-
- public FixedSizeAllocator(ConcurrentFramePool fmm) {
- this.fmm = fmm;
- }
-
- public int getAllocated() {
- return allocated;
- }
-
- @Override
- public void run() {
- while (fmm.get() != null) {
- allocated++;
- }
- }
- }
-
- private class FixedSizeGoodAllocator implements Runnable {
- private final ConcurrentFramePool fmm;
- private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
- private final Random random = new Random();
-
- public FixedSizeGoodAllocator(ConcurrentFramePool fmm) {
- this.fmm = fmm;
- }
-
- public int getAllocated() {
- return stack.size();
- }
-
- @Override
- public void run() {
- while (true) {
- if (random.nextDouble() < RELEASE_PROBABILITY) {
- if (!stack.isEmpty()) {
- try {
- fmm.release(stack.pop());
- } catch (HyracksDataException e) {
- Assert.fail();
- }
- }
- } else {
- ByteBuffer buffer = fmm.get();
- if (buffer == null) {
- break;
- } else {
- stack.push(buffer);
- }
- }
- }
- }
- }
-
- private class VarSizeGoodAllocator implements Runnable {
- private final ConcurrentFramePool fmm;
- private int allocated = 0;
- private int req = 0;
- private final Random random = new Random();
- private Throwable cause;
- private final ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
-
- public VarSizeGoodAllocator(ConcurrentFramePool fmm) {
- this.fmm = fmm;
- }
-
- public int getAllocated() {
- return allocated;
- }
-
- public Throwable cause() {
- return cause;
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- if (random.nextDouble() < RELEASE_PROBABILITY) {
- if (!stack.isEmpty()) {
- ByteBuffer buffer = stack.pop();
- allocated -= (buffer.capacity() / DEFAULT_FRAME_SIZE);
- fmm.release(buffer);
- }
- } else {
- req = random.nextInt(MAX_SIZE) + 1;
- if (req == 1) {
- ByteBuffer buffer = fmm.get();
- if (buffer != null) {
- stack.push(buffer);
- allocated += 1;
- } else {
- break;
- }
- } else {
- ByteBuffer buffer = fmm.get(req * DEFAULT_FRAME_SIZE);
- if (buffer != null) {
- stack.push(buffer);
- allocated += req;
- } else {
- break;
- }
- }
- }
- }
- } catch (Throwable th) {
- this.cause = th;
- }
- }
- }
-
- private class VarSizeAllocator implements Runnable {
- private final ConcurrentFramePool fmm;
- private int allocated = 0;
- private int req = 0;
- private final Random random = new Random();
- private Throwable cause;
-
- public VarSizeAllocator(ConcurrentFramePool fmm) {
- this.fmm = fmm;
- }
-
- public int getAllocated() {
- return allocated;
- }
-
- public int getLastReq() {
- return req;
- }
-
- public Throwable cause() {
- return cause;
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- req = random.nextInt(MAX_SIZE) + 1;
- if (req == 1) {
- if (fmm.get() != null) {
- allocated += 1;
- } else {
- break;
- }
- } else if (fmm.get(req * DEFAULT_FRAME_SIZE) != null) {
- allocated += req;
- } else {
- break;
- }
- }
- } catch (Throwable th) {
- this.cause = th;
- }
- }
- }
-}