You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/01 05:05:26 UTC
incubator-apex-core git commit: APEXCORE-365 - Log error when buffer
server receives a tuple with the length that exceeds buffer server data list
block size
Repository: incubator-apex-core
Updated Branches:
refs/heads/master cf4e29ea3 -> 6b78b57bd
APEXCORE-365 - Log error when buffer server receives a tuple with the length that exceeds buffer server data list block size
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6b78b57b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6b78b57b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6b78b57b
Branch: refs/heads/master
Commit: 6b78b57bd6d13cb855f7b767106d33c2b5f97885
Parents: cf4e29e
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Feb 29 13:11:49 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Feb 29 20:04:56 2016 -0800
----------------------------------------------------------------------
.../com/datatorrent/bufferserver/internal/DataList.java | 7 ++++++-
.../com/datatorrent/bufferserver/server/Server.java | 12 ++++++------
2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6b78b57b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 0de7261..95c32b0 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -447,8 +447,13 @@ public class DataList
return (storage == null) || (numberOfInMemBlockPermits.get() > 0);
}
- public byte[] newBuffer()
+ public byte[] newBuffer(final int size)
{
+ if (size > blockSize) {
+ logger.error("Tuple size {} exceeds buffer server current block size {}. Please decrease tuple size. " +
+ "Proceeding with allocating larger block that may cause out of memory exception.", size, blockSize);
+ return new byte[size];
+ }
return new byte[blockSize];
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6b78b57b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 03d96ee..c2da111 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -665,7 +665,7 @@ public class Server implements ServerListener
* so we allocate a new byteBuffer and copy over the partially written data to the
* new byteBuffer and start as if we always had full room but not enough data.
*/
- if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) {
+ if (!switchToNewBufferOrSuspendRead(buffer, readOffset, size)) {
return false;
}
}
@@ -694,7 +694,7 @@ public class Server implements ServerListener
/*
* hit wall while writing serialized data, so have to allocate a new byteBuffer.
*/
- if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) {
+ if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size), size)) {
readOffset -= VarInt.getSize(size);
size = 0;
return false;
@@ -710,19 +710,19 @@ public class Server implements ServerListener
while (true);
}
- private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset)
+ private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset, final int size)
{
- if (switchToNewBuffer(array, offset)) {
+ if (switchToNewBuffer(array, offset, size)) {
return true;
}
datalist.suspendRead(this);
return false;
}
- private boolean switchToNewBuffer(final byte[] array, final int offset)
+ private boolean switchToNewBuffer(final byte[] array, final int offset, final int size)
{
if (datalist.isMemoryBlockAvailable()) {
- final byte[] newBuffer = datalist.newBuffer();
+ final byte[] newBuffer = datalist.newBuffer(size);
byteBuffer = ByteBuffer.wrap(newBuffer);
if (array == null || array.length - offset == 0) {
writeOffset = 0;