You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/11 03:04:59 UTC
[1/2] incubator-apex-core git commit: APEX-68: Buffer server should
use a separate thread to spool blocks to disk
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 97cbef6c7 -> 928d36804
APEX-68: Buffer server should use a separate thread to spool blocks to disk
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1b8aecf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1b8aecf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1b8aecf3
Branch: refs/heads/devel-3
Commit: 1b8aecf3b429069b92a790a4aae2e41490934de7
Parents: f2a4071
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Wed Sep 9 15:41:30 2015 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Wed Sep 9 15:41:30 2015 -0700
----------------------------------------------------------------------
.../bufferserver/internal/DataList.java | 598 +++++++++++--------
.../bufferserver/internal/FastDataList.java | 64 +-
.../datatorrent/bufferserver/server/Server.java | 101 +++-
3 files changed, 425 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index baa052a..6806168 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -19,6 +19,8 @@ import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +34,13 @@ import com.datatorrent.bufferserver.util.BitVector;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.bufferserver.util.SerializedData;
import com.datatorrent.bufferserver.util.VarInt;
+import com.datatorrent.netlet.AbstractClient;
import com.datatorrent.netlet.util.VarInt.MutableInt;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
/**
* Maintains list of data and manages addition and deletion of the data<p>
* <br>
@@ -44,83 +51,137 @@ public class DataList
{
private final int MAX_COUNT_OF_INMEM_BLOCKS;
protected final String identifier;
- private final Integer blocksize;
- private HashMap<BitVector, HashSet<DataListener>> listeners = new HashMap<BitVector, HashSet<DataListener>>();
- protected HashSet<DataListener> all_listeners = new HashSet<DataListener>();
+ private final int blockSize;
+ private final HashMap<BitVector, HashSet<DataListener>> listeners = newHashMap();
+ protected final HashSet<DataListener> all_listeners = newHashSet();
+ protected final HashMap<String, DataListIterator> iterators = newHashMap();
protected Block first;
protected Block last;
protected Storage storage;
- protected ExecutorService autoflushExecutor;
+ protected ExecutorService autoFlushExecutor;
protected ExecutorService storageExecutor;
+ protected int size;
+ protected int processingOffset;
+ protected long baseSeconds;
+ private final List<AbstractClient> suspendedClients = newArrayList();
+ private final AtomicInteger numberOfInMemBlockPermits;
+ private MutableInt nextOffset = new MutableInt();
+
+ public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks)
+ {
+ if (numberOfCacheBlocks < 1) {
+ throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks);
+ }
+ this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks;
+ numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 1);
+ this.identifier = identifier;
+ this.blockSize = blockSize;
+ first = last = new Block(identifier, blockSize);
+ }
- public int getBlockSize()
+ public DataList(String identifier)
{
- return blocksize;
+ /*
+ * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem.
+ * we will use default value of 8 block sizes to be cached in memory
+ */
+ this(identifier, 64 * 1024 * 1024, 8);
}
- public void rewind(int baseSeconds, int windowId) throws IOException
+ public int getBlockSize()
{
- long longWindowId = (long)baseSeconds << 32 | windowId;
+ return blockSize;
+ }
- for (Block temp = first; temp != null; temp = temp.next) {
+ public void rewind(final int baseSeconds, final int windowId) throws IOException
+ {
+ final long longWindowId = (long)baseSeconds << 32 | windowId;
+ logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId));
- if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) {
- if (temp != last) {
- temp.next = null;
- last = temp;
+ int numberOfInMemBlockRewound = 0;
+ synchronized (this) {
+ for (Block temp = first; temp != null; temp = temp.next) {
+ if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) {
+ if (temp != last) {
+ last = temp;
+ do {
+ temp = temp.next;
+ temp.discard(false);
+ if (temp.data != null) {
+ temp.data = null;
+ numberOfInMemBlockRewound++;
+ }
+ } while (temp.next != null);
+ last.next = null;
+ last.acquire(true);
+ }
+ this.baseSeconds = last.rewind(longWindowId);
+ processingOffset = last.writingOffset;
+ size = 0;
+ break;
}
-
- this.baseSeconds = temp.rewind(longWindowId);
- processingOffset = temp.writingOffset;
- size = 0;
}
}
- for (DataListIterator dli : iterators.values()) {
- dli.rewind(processingOffset);
- }
+ /*
+ TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block.
+ */
+
+ final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound);
+ assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+ resumeSuspendedClients(numberOfInMemBlockPermits);
+ logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
+
}
public void reset()
{
+ logger.debug("Resetting {}", this);
listeners.clear();
all_listeners.clear();
- if (storage != null) {
- while (first != null) {
- if (first.uniqueIdentifier > 0) {
- logger.debug("discarding {} {} in reset", identifier, first.uniqueIdentifier);
- storage.discard(identifier, first.uniqueIdentifier);
+ synchronized (this) {
+ if (storage != null) {
+ Block temp = first;
+ while (temp != last) {
+ temp.discard(false);
+ temp.data = null;
+ temp = temp.next;
}
- first = first.next;
}
+ first = last;
}
+ numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1);
}
- public void purge(int baseSeconds, int windowId)
+ public void purge(final int baseSeconds, final int windowId)
{
- long longWindowId = (long)baseSeconds << 32 | windowId;
- logger.debug("purge request for windowId {}", Codec.getStringWindowId(longWindowId));
-
- Block prev = null;
- for (Block temp = first; temp != null && temp.starting_window <= longWindowId; temp = temp.next) {
- if (temp.ending_window > longWindowId || temp == last) {
- if (prev != null) {
- first = temp;
+ final long longWindowId = (long)baseSeconds << 32 | windowId;
+ logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId));
+
+ int numberOfInMemBlockPurged = 0;
+ synchronized (this) {
+ for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) {
+ if (temp.ending_window > longWindowId || temp == last) {
+ if (prev != null) {
+ first = temp;
+ }
+ first.purge(longWindowId);
+ break;
+ }
+ temp.discard(false);
+ if (temp.data != null) {
+ temp.data = null;
+ numberOfInMemBlockPurged++;
}
-
- first.purge(longWindowId);
- break;
}
+ }
- if (storage != null && temp.uniqueIdentifier > 0) {
- logger.debug("discarding {} {} in purge", identifier, temp.uniqueIdentifier);
-
- storage.discard(identifier, temp.uniqueIdentifier);
- }
+ final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged);
+ assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+ resumeSuspendedClients(numberOfInMemBlockPermits);
+ logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
- prev = temp;
- }
}
/**
@@ -131,35 +192,6 @@ public class DataList
return identifier;
}
- public DataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount)
- {
- this(identifier, blocksize, numberOfCacheBlocks);
- first.refCount = refCount;
- }
-
- public DataList(String identifier, int blocksize, int numberOfCacheBlocks)
- {
- this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks;
- this.identifier = identifier;
- this.blocksize = blocksize;
- first = new Block(identifier, blocksize);
- last = first;
- }
-
- public DataList(String identifier)
- {
- /*
- * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem.
- * we will use default value of 8 block sizes to be cached in memory
- */
- this(identifier, 64 * 1024 * 1024, 8);
- }
-
- MutableInt nextOffset = new MutableInt();
- long baseSeconds;
- int size;
- int processingOffset;
-
public void flush(final int writeOffset)
{
//logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset);
@@ -195,8 +227,7 @@ public class DataList
last.starting_window = baseSeconds | bwt.getWindowId();
last.ending_window = last.starting_window;
//logger.debug("assigned both window id {}", last);
- }
- else {
+ } else {
last.ending_window = baseSeconds | bwt.getWindowId();
//logger.debug("assigned last window id {}", last);
}
@@ -209,8 +240,7 @@ public class DataList
}
processingOffset += size;
size = 0;
- }
- else {
+ } else {
if (writeOffset == last.data.length) {
nextOffset.integer = 0;
processingOffset = 0;
@@ -218,12 +248,11 @@ public class DataList
}
break;
}
- }
- while (true);
+ } while (true);
last.writingOffset = writeOffset;
- autoflushExecutor.submit(new Runnable()
+ autoFlushExecutor.submit(new Runnable()
{
@Override
public void run()
@@ -236,9 +265,9 @@ public class DataList
});
}
- public void setAutoflushExecutor(final ExecutorService es)
+ public void setAutoFlushExecutor(final ExecutorService es)
{
- autoflushExecutor = es;
+ autoFlushExecutor = es;
}
public void setSecondaryStorage(Storage storage, ExecutorService es)
@@ -250,13 +279,16 @@ public class DataList
/*
* Iterator related functions.
*/
- protected final HashMap<String, DataListIterator> iterators = new HashMap<String, DataListIterator>();
-
- public DataListIterator getIterator(Block block)
+ protected DataListIterator getIterator(final Block block)
{
return new DataListIterator(block);
}
+ private synchronized Block getNextBlock(final Block block)
+ {
+ return block.next;
+ }
+
public Iterator<SerializedData> newIterator(String identifier, long windowId)
{
//logger.debug("request for a new iterator {} and {}", identifier, windowId);
@@ -283,21 +315,17 @@ public class DataList
*/
public boolean delIterator(Iterator<SerializedData> iterator)
{
- boolean released = false;
if (iterator instanceof DataListIterator) {
DataListIterator dli = (DataListIterator)iterator;
for (Entry<String, DataListIterator> e : iterators.entrySet()) {
if (e.getValue() == dli) {
- if (dli.da != null) {
- dli.da.release(false);
- }
+ dli.close();
iterators.remove(e.getKey());
- released = true;
- break;
+ return true;
}
}
}
- return released;
+ return false;
}
public void addDataListener(DataListener dl)
@@ -310,20 +338,17 @@ public class DataList
HashSet<DataListener> set;
if (listeners.containsKey(partition)) {
set = listeners.get(partition);
- }
- else {
+ } else {
set = new HashSet<DataListener>();
listeners.put(partition, set);
}
set.add(dl);
}
- }
- else {
+ } else {
HashSet<DataListener> set;
if (listeners.containsKey(DataListener.NULL_PARTITION)) {
set = listeners.get(DataListener.NULL_PARTITION);
- }
- else {
+ } else {
set = new HashSet<DataListener>();
listeners.put(DataListener.NULL_PARTITION, set);
}
@@ -341,8 +366,7 @@ public class DataList
listeners.get(partition).remove(dl);
}
}
- }
- else {
+ } else {
if (listeners.containsKey(DataListener.NULL_PARTITION)) {
listeners.get(DataListener.NULL_PARTITION).remove(dl);
}
@@ -351,43 +375,46 @@ public class DataList
all_listeners.remove(dl);
}
- public void addBuffer(byte[] array)
+ public boolean suspendRead(final AbstractClient client)
{
- last.next = new Block(identifier, array);
- last.next.starting_window = last.ending_window;
- last.next.ending_window = last.ending_window;
- last = last.next;
-
- //logger.debug("addbuffer last = {}", last);
- int inmemBlockCount;
+ synchronized (suspendedClients) {
+ return client.suspendReadIfResumed() && suspendedClients.add(client);
+ }
+ }
- inmemBlockCount = 0;
- for (Block temp = first; temp != null; temp = temp.next) {
- if (temp.data != null) {
- inmemBlockCount++;
+ public boolean resumeSuspendedClients(final int numberOfInMemBlockPermits)
+ {
+ boolean resumedSuspendedClients = false;
+ if (numberOfInMemBlockPermits > 0) {
+ synchronized (suspendedClients) {
+ for (AbstractClient client : suspendedClients) {
+ resumedSuspendedClients |= client.resumeReadIfSuspended();
+ }
+ suspendedClients.clear();
}
}
+ return resumedSuspendedClients;
+ }
- if (inmemBlockCount >= MAX_COUNT_OF_INMEM_BLOCKS) {
- //logger.debug("InmemBlockCount before releaes {}", inmemBlockCount);
- for (Block temp = first; temp != null; temp = temp.next) {
- boolean found = false;
- for (DataListIterator iterator : iterators.values()) {
- if (iterator.da == temp) {
- found = true;
- break;
- }
- }
+ public boolean isMemoryBlockAvailable()
+ {
+ return numberOfInMemBlockPermits.get() > 0;
+ }
- if (!found && temp.data != null) {
- temp.release(true);
- if (--inmemBlockCount < MAX_COUNT_OF_INMEM_BLOCKS) {
- break;
- }
- }
- }
- //logger.debug("InmemBlockCount after release {}", inmemBlockCount);
+ public byte[] newBuffer()
+ {
+ return new byte[blockSize];
+ }
+
+ public void addBuffer(byte[] array)
+ {
+ final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.decrementAndGet();
+ if (numberOfInMemBlockPermits < 0) {
+ logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
}
+ last.next = new Block(identifier, array, last.ending_window, last.ending_window);
+ last.release(false);
+ last = last.next;
}
public byte[] getBuffer(long windowId)
@@ -461,6 +488,12 @@ public class DataList
return status;
}
+ @Override
+ public String toString()
+ {
+ return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier + '}';
+ }
+
/**
* <p>Block class.</p>
*
@@ -484,7 +517,7 @@ public class DataList
/**
* The starting window which is available in this data array.
*/
- long starting_window = -1;
+ long starting_window;
/**
* the ending window which is available in this data array
*/
@@ -500,7 +533,8 @@ public class DataList
/**
* how count of references to this block.
*/
- int refCount;
+ AtomicInteger refCount;
+ Future future;
public Block(String id, int size)
{
@@ -509,9 +543,17 @@ public class DataList
public Block(String id, byte[] array)
{
+ this(id, array, -1, 0);
+ }
+
+ public Block(final String id, final byte[] array, final long starting_window, final long ending_window)
+ {
identifier = id;
data = array;
- refCount = 1;
+ refCount = new AtomicInteger(1);
+ this.starting_window = starting_window;
+ this.ending_window = ending_window;
+ //logger.debug("Allocated new {}", this);
}
void getNextData(SerializedData current)
@@ -530,27 +572,28 @@ public class DataList
public long rewind(long windowId)
{
long bs = starting_window & 0x7fffffff00000000L;
- DataListIterator dli = getIterator(this);
- done:
- while (dli.hasNext()) {
- SerializedData sd = dli.next();
- switch (sd.buffer[sd.dataOffset]) {
- case MessageType.RESET_WINDOW_VALUE:
- ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
- bs = (long)rwt.getBaseSeconds() << 32;
- if (bs > windowId) {
- writingOffset = sd.offset;
- break done;
- }
- break;
+ try (DataListIterator dli = getIterator(this)) {
+ done:
+ while (dli.hasNext()) {
+ SerializedData sd = dli.next();
+ switch (sd.buffer[sd.dataOffset]) {
+ case MessageType.RESET_WINDOW_VALUE:
+ ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ bs = (long)rwt.getBaseSeconds() << 32;
+ if (bs > windowId) {
+ writingOffset = sd.offset;
+ break done;
+ }
+ break;
- case MessageType.BEGIN_WINDOW_VALUE:
- BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
- if ((bs | bwt.getWindowId()) >= windowId) {
- writingOffset = sd.offset;
- break done;
- }
- break;
+ case MessageType.BEGIN_WINDOW_VALUE:
+ BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ if ((bs | bwt.getWindowId()) >= windowId) {
+ writingOffset = sd.offset;
+ break done;
+ }
+ break;
+ }
}
}
@@ -558,16 +601,12 @@ public class DataList
starting_window = windowId;
ending_window = windowId;
//logger.debug("assigned both window id {}", this);
- }
- else if (windowId < ending_window) {
+ } else if (windowId < ending_window) {
ending_window = windowId;
//logger.debug("assigned end window id {}", this);
}
- if (uniqueIdentifier != 0) {
- storage.discard(identifier, uniqueIdentifier);
- uniqueIdentifier = 0;
- }
+ discard(false);
return bs;
}
@@ -580,39 +619,40 @@ public class DataList
long bs = starting_window & 0xffffffff00000000L;
SerializedData lastReset = null;
- DataListIterator dli = getIterator(this);
- done:
- while (dli.hasNext()) {
- SerializedData sd = dli.next();
- switch (sd.buffer[sd.dataOffset]) {
- case MessageType.RESET_WINDOW_VALUE:
- ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
- bs = (long)rwt.getBaseSeconds() << 32;
- lastReset = sd;
- break;
-
- case MessageType.BEGIN_WINDOW_VALUE:
- BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
- if ((bs | bwt.getWindowId()) > longWindowId) {
- found = true;
- if (lastReset != null) {
+ try (DataListIterator dli = getIterator(this)) {
+ done:
+ while (dli.hasNext()) {
+ SerializedData sd = dli.next();
+ switch (sd.buffer[sd.dataOffset]) {
+ case MessageType.RESET_WINDOW_VALUE:
+ ResetWindowTuple rwt = (ResetWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ bs = (long)rwt.getBaseSeconds() << 32;
+ lastReset = sd;
+ break;
+
+ case MessageType.BEGIN_WINDOW_VALUE:
+ BeginWindowTuple bwt = (BeginWindowTuple) Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset);
+ if ((bs | bwt.getWindowId()) > longWindowId) {
+ found = true;
+ if (lastReset != null) {
/*
* Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple.
*/
- if (sd.offset >= lastReset.length) {
- sd.offset -= lastReset.length;
- if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) {
- System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length);
+ if (sd.offset >= lastReset.length) {
+ sd.offset -= lastReset.length;
+ if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) {
+ System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length);
+ }
}
+
+ this.starting_window = bs | bwt.getWindowId();
+ this.readingOffset = sd.offset;
+ //logger.debug("assigned starting window id {}", this);
}
- this.starting_window = bs | bwt.getWindowId();
- this.readingOffset = sd.offset;
- //logger.debug("assigned starting window id {}", this);
+ break done;
}
-
- break done;
- }
+ }
}
}
@@ -654,46 +694,44 @@ public class DataList
logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset);
}
- if (uniqueIdentifier != 0) {
- storage.discard(identifier, uniqueIdentifier);
- uniqueIdentifier = 0;
- }
+ discard(false);
}
}
- private Runnable getRetriever(final int uniqueIdentifier, final Storage storage)
+ private Runnable getRetriever()
{
return new Runnable()
{
@Override
public void run()
{
- byte[] lData = storage.retrieve(identifier, uniqueIdentifier);
+ byte[] data = storage.retrieve(identifier, uniqueIdentifier);
synchronized (Block.this) {
- data = lData;
+ Block.this.data = data;
readingOffset = 0;
writingOffset = data.length;
- if (refCount > 1) {
+ if (refCount.get() > 1) {
Block.this.notifyAll();
}
+ int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet();
+ if (numberOfInMemBlockPermits < 0) {
+ logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
+ }
}
}
-
};
}
- synchronized void acquire(boolean wait)
+ protected void acquire(boolean wait)
{
- if (refCount++ == 0 && uniqueIdentifier > 0 && storage != null) {
- assert (data == null);
+ if (refCount.getAndIncrement() == 0 && storage != null && data == null) {
+ final Runnable retriever = getRetriever();
if (wait) {
- getRetriever(uniqueIdentifier, storage).run();
+ retriever.run();
+ } else {
+ future = storageExecutor.submit(retriever);
}
- else {
- storageExecutor.submit(getRetriever(uniqueIdentifier, storage));
- }
- }
- else if (wait && data == null) {
+ } else if (wait && data == null) {
try {
wait();
}
@@ -710,35 +748,70 @@ public class DataList
@Override
public void run()
{
- int i = storage.store(identifier, data, readingOffset, writingOffset);
- if (i == 0) {
+ if (uniqueIdentifier == 0) {
+ uniqueIdentifier = storage.store(identifier, data, readingOffset, writingOffset);
+ }
+ if (uniqueIdentifier == 0) {
logger.warn("Storage returned unexpectedly, please check the status of the spool directory!");
}
else {
+ //logger.debug("Spooled {} to disk", Block.this);
synchronized (Block.this) {
- Block.this.uniqueIdentifier = i;
- if (refCount == 0) {
+ if (refCount.get() == 0) {
Block.this.data = null;
}
}
+ int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet();
+ assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
+ resumeSuspendedClients(numberOfInMemBlockPermits);
}
}
+ };
+ }
+
+ protected void release(boolean wait)
+ {
+ final int refCount = this.refCount.decrementAndGet();
+ if (refCount == 0 && storage != null) {
+ assert (next != null);
+ final Runnable storer = getStorer(data, readingOffset, writingOffset, storage);
+ if (wait && numberOfInMemBlockPermits.get() == 0) {
+ storer.run();
+ } else if (numberOfInMemBlockPermits.get() < MAX_COUNT_OF_INMEM_BLOCKS/2) {
+ future = storageExecutor.submit(storer);
+ }
+ } else {
+ logger.debug("Holding {} in memory due to {} references.", this, refCount);
+ }
+ }
+ private Runnable getDiscarder()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (uniqueIdentifier > 0) {
+ logger.debug("Discarding {}", Block.this);
+ storage.discard(identifier, uniqueIdentifier);
+ uniqueIdentifier = 0;
+ }
+ }
};
}
- synchronized void release(boolean wait)
+ protected void discard(final boolean wait)
{
- if (--refCount == 0 && storage != null) {
- if (uniqueIdentifier != 0) {
- data = null;
- return;
+ if (storage != null) {
+ if (future != null) {
+ future.cancel(false);
}
+ final Runnable discarder = getDiscarder();
if (wait) {
- getStorer(data, readingOffset, writingOffset, storage).run();
- }
- else {
- storageExecutor.submit(getStorer(data, readingOffset, writingOffset, storage));
+ discarder.run();
+ } else {
+ future = storageExecutor.submit(discarder);
}
}
}
@@ -746,10 +819,10 @@ public class DataList
@Override
public String toString()
{
- return "Block{" + "identifier=" + identifier + ", data=" + (data == null ? "null" : data.length)
+ return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length)
+ ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset
+ ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window)
- + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
+ + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier)
+ '}';
}
@@ -760,7 +833,7 @@ public class DataList
*
* @since 0.3.2
*/
- public class DataListIterator implements Iterator<SerializedData>
+ public class DataListIterator implements Iterator<SerializedData>, AutoCloseable
{
Block da;
SerializedData current;
@@ -792,12 +865,28 @@ public class DataList
return readOffset;
}
+ protected boolean switchToNextBlock()
+ {
+ Block next = getNextBlock(da);
+ if (next == null) {
+ return false;
+ }
+ //logger.debug("{}: switching to the next block {}->{}", this, da, da.next);
+ next.acquire(true);
+ da.release(false);
+ da = next;
+ size = 0;
+ buffer = da.data;
+ readOffset = da.readingOffset;
+ return true;
+ }
+
/**
*
* @return boolean
*/
@Override
- public synchronized boolean hasNext()
+ public boolean hasNext()
{
while (size == 0) {
size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset);
@@ -810,53 +899,26 @@ public class DataList
case -2:
case -1:
case 0:
- if (da.writingOffset == buffer.length) {
- if (da.next == null) {
- return false;
- }
-
- da.release(false);
- da.next.acquire(true);
- da = da.next;
- size = 0;
- buffer = da.data;
- readOffset = da.readingOffset;
- }
- else {
- return false;
+ if (da.writingOffset == buffer.length && switchToNextBlock()) {
+ continue;
}
+ return false;
}
}
- while (true) {
- if (nextOffset.integer + size <= da.writingOffset) {
- current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
- current.dataOffset = nextOffset.integer;
- //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) {
- // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset);
- // logger.debug("next t = {}", t);
- //}
- return true;
- }
- else {
- if (da.writingOffset == buffer.length) {
- if (da.next == null) {
- return false;
- }
- else {
- da.release(false);
- da.next.acquire(true);
- da = da.next;
- size = 0;
- readOffset = nextOffset.integer = da.readingOffset;
- buffer = da.data;
- }
- }
- else {
- return false;
- }
- }
+ if (nextOffset.integer + size <= da.writingOffset) {
+ current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
+ current.dataOffset = nextOffset.integer;
+ //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) {
+ // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset);
+ // logger.debug("next t = {}", t);
+ //}
+ return true;
+ } else if (da.writingOffset == buffer.length && switchToNextBlock()) {
+ nextOffset.integer = da.readingOffset;
+ return hasNext();
}
+ return false;
}
/**
@@ -882,6 +944,16 @@ public class DataList
current.buffer[current.dataOffset] = MessageType.NO_MESSAGE_VALUE;
}
+ @Override
+ public void close()
+ {
+ if (da != null) {
+ da.release(false);
+ da = null;
+ buffer = null;
+ }
+ }
+
void rewind(int processingOffset)
{
readOffset = processingOffset;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
index d260b37..fe0d9f4 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java
@@ -39,12 +39,6 @@ public class FastDataList extends DataList
super(identifier, blocksize, numberOfCacheBlocks);
}
- public FastDataList(String identifier, int blocksize, int numberOfCacheBlocks, int refCount)
- {
- super(identifier, blocksize, numberOfCacheBlocks, refCount);
- }
-
-
long item;
@Override
@@ -102,7 +96,7 @@ public class FastDataList extends DataList
last.writingOffset = writeOffset;
- autoflushExecutor.submit(new Runnable()
+ autoFlushExecutor.submit(new Runnable()
{
@Override
public void run()
@@ -116,7 +110,7 @@ public class FastDataList extends DataList
}
@Override
- public FastDataListIterator getIterator(Block block)
+ protected FastDataListIterator getIterator(Block block)
{
return new FastDataListIterator(block);
}
@@ -188,59 +182,37 @@ public class FastDataList extends DataList
}
@Override
- public synchronized boolean hasNext()
+ public boolean hasNext()
{
while (size == 0) {
if (da.writingOffset - readOffset >= 2) {
size = buffer[readOffset];
size |= (buffer[readOffset + 1] << 8);
- }
- else {
- if (da.writingOffset == buffer.length) {
- if (da.next == null) {
- return false;
- }
-
- da.release(false);
- da.next.acquire(true);
- da = da.next;
- size = 0;
- buffer = da.data;
- readOffset = da.readingOffset;
- }
- else {
+ } else {
+ if (da.writingOffset == buffer.length && switchToNextBlock()) {
+ continue;
+ } else {
return false;
}
}
}
- while (true) {
- if (readOffset + size + 2 <= da.writingOffset) {
- current = new SerializedData(buffer, readOffset, size + 2);
- current.dataOffset = readOffset + 2;
- return true;
- }
- else {
- if (da.writingOffset == buffer.length) {
- if (da.next == null) {
- return false;
- }
- else {
- da.release(false);
- da.next.acquire(true);
- da = da.next;
- size = 0;
- readOffset = nextOffset.integer = da.readingOffset;
- buffer = da.data;
- }
- }
- else {
+ if (readOffset + size + 2 <= da.writingOffset) {
+ current = new SerializedData(buffer, readOffset, size + 2);
+ current.dataOffset = readOffset + 2;
+ return true;
+ } else {
+ if (da.writingOffset == buffer.length) {
+ if (!switchToNextBlock()) {
return false;
}
+ nextOffset.integer = da.readingOffset;
+ return hasNext();
+ } else {
+ return false;
}
}
}
-
}
private static final Logger logger = LoggerFactory.getLogger(FastDataList.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1b8aecf3/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 7fb4823..33a2442 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -97,16 +97,16 @@ public class Server implements ServerListener
@Override
public void unregistered(SelectionKey key)
{
- serverHelperExecutor.shutdown();
- storageHelperExecutor.shutdown();
- try {
- serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ex) {
- logger.debug("Executor Termination", ex);
- }
- logger.info("Server stopped listening at {}", address);
- }
+ serverHelperExecutor.shutdown();
+ storageHelperExecutor.shutdown();
+ try {
+ serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex) {
+ logger.debug("Executor Termination", ex);
+ }
+ logger.info("Server stopped listening at {}", address);
+ }
public synchronized InetSocketAddress run(EventLoop eventloop)
{
@@ -262,7 +262,7 @@ public class Server implements ServerListener
//logger.debug("old list = {}", dl);
}
else {
- dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks, 0);
+ dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks);
publisherBuffers.put(upstream_identifier, dl);
//logger.debug("new list = {}", dl);
}
@@ -401,7 +401,7 @@ public class Server implements ServerListener
PublishRequestTuple publisherRequest = (PublishRequestTuple)request;
DataList dl = handlePublisherRequest(publisherRequest, this);
- dl.setAutoflushExecutor(serverHelperExecutor);
+ dl.setAutoFlushExecutor(serverHelperExecutor);
Publisher publisher;
if (publisherRequest.getVersion().equals(Tuple.FAST_VERSION)) {
@@ -616,6 +616,32 @@ public class Server implements ServerListener
dirty = true;
}
+ /**
+ * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ}
+ * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)}
+ * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key
+ * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}.
+ * @return true
+ */
+ @Override
+ public boolean resumeReadIfSuspended()
+ {
+ eventloop.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ final int interestOps = key.interestOps();
+ if ((interestOps & SelectionKey.OP_READ) == 0) {
+ logger.debug("Resuming read on key {} with attachment {}", key, key.attachment());
+ read(0);
+ key.interestOps(interestOps | SelectionKey.OP_READ);
+ }
+ }
+ });
+ return true;
+ }
+
@Override
public void read(int len)
{
@@ -634,7 +660,9 @@ public class Server implements ServerListener
* so we allocate a new byteBuffer and copy over the partially written data to the
* new byteBuffer and start as if we always had full room but not enough data.
*/
- switchToNewBuffer(buffer, readOffset);
+ if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) {
+ return;
+ }
}
}
else if (dirty) {
@@ -660,10 +688,13 @@ public class Server implements ServerListener
/*
* hit wall while writing serialized data, so have to allocate a new byteBuffer.
*/
- switchToNewBuffer(buffer, readOffset - VarInt.getSize(size));
+ if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) {
+ readOffset -= VarInt.getSize(size);
+ size = 0;
+ return;
+ }
size = 0;
- }
- else if (dirty) {
+ } else if (dirty) {
dirty = false;
datalist.flush(writeOffset);
}
@@ -673,21 +704,33 @@ public class Server implements ServerListener
while (true);
}
- public void switchToNewBuffer(byte[] array, int offset)
+ private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset)
{
- byte[] newBuffer = new byte[datalist.getBlockSize()];
- byteBuffer = ByteBuffer.wrap(newBuffer);
- if (array == null || array.length - offset == 0) {
- writeOffset = 0;
+ if (switchToNewBuffer(array, offset)) {
+ return true;
}
- else {
- writeOffset = array.length - offset;
- System.arraycopy(buffer, offset, newBuffer, 0, writeOffset);
- byteBuffer.position(writeOffset);
+ datalist.suspendRead(this);
+ return false;
+ }
+
+ private boolean switchToNewBuffer(final byte[] array, final int offset)
+ {
+ if (datalist.isMemoryBlockAvailable()) {
+ final byte[] newBuffer = datalist.newBuffer();
+ byteBuffer = ByteBuffer.wrap(newBuffer);
+ if (array == null || array.length - offset == 0) {
+ writeOffset = 0;
+ } else {
+ writeOffset = array.length - offset;
+ System.arraycopy(buffer, offset, newBuffer, 0, writeOffset);
+ byteBuffer.position(writeOffset);
+ }
+ buffer = newBuffer;
+ readOffset = 0;
+ datalist.addBuffer(buffer);
+ return true;
}
- buffer = newBuffer;
- readOffset = 0;
- datalist.addBuffer(buffer);
+ return false;
}
@Override
@@ -714,7 +757,7 @@ public class Server implements ServerListener
@Override
public String toString()
{
- return "Server.Publisher{" + "datalist=" + datalist + '}';
+ return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {datalist=" + datalist + '}';
}
private volatile boolean torndown;
[2/2] incubator-apex-core git commit: Merge branch 'APEX-68' of
github.com:vrozov/incubator-apex-core into vlad-apex-68
Posted by ch...@apache.org.
Merge branch 'APEX-68' of github.com:vrozov/incubator-apex-core into vlad-apex-68
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/928d3680
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/928d3680
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/928d3680
Branch: refs/heads/devel-3
Commit: 928d36804b695e35e8a59cc8acf3aabc23b296a9
Parents: 97cbef6 1b8aecf
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Thu Sep 10 17:58:24 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Thu Sep 10 17:58:24 2015 -0700
----------------------------------------------------------------------
.../bufferserver/internal/DataList.java | 598 +++++++++++--------
.../bufferserver/internal/FastDataList.java | 64 +-
.../datatorrent/bufferserver/server/Server.java | 101 +++-
3 files changed, 425 insertions(+), 338 deletions(-)
----------------------------------------------------------------------