You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/05 20:53:07 UTC
[2/6] flink git commit: [FLINK-6879] [runtime] Activate checkstyle
for runtime/memory
[FLINK-6879] [runtime] Activate checkstyle for runtime/memory
This closes #4097
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c150a6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c150a6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c150a6f
Branch: refs/heads/master
Commit: 7c150a6fc25962f2229ef1de256b668fa640a9b5
Parents: 2a3ab2b
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 12:29:35 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 5 15:22:48 2017 -0400
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../runtime/memory/AbstractPagedInputView.java | 115 +++++++++----------
.../runtime/memory/AbstractPagedOutputView.java | 97 ++++++++--------
.../runtime/memory/ListMemorySegmentSource.java | 13 +--
.../memory/MemoryAllocationException.java | 3 +-
.../flink/runtime/memory/MemoryManager.java | 109 +++++++++---------
.../MemoryManagerConcurrentModReleaseTest.java | 37 +++---
.../memory/MemoryManagerLazyAllocationTest.java | 59 +++++-----
.../flink/runtime/memory/MemoryManagerTest.java | 73 ++++++------
.../runtime/memory/MemorySegmentSimpleTest.java | 39 ++++---
10 files changed, 267 insertions(+), 279 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 15b502e..0ee99e4 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -443,7 +443,6 @@ under the License.
**/runtime/jobmanager/**,
**/runtime/jobmaster/**,
**/runtime/leaderelection/**,
- **/runtime/memory/**,
**/runtime/messages/**,
**/runtime/minicluster/**,
**/runtime/operators/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
index 788eebb..0536e1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.memory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
-
/**
* The base class for all input views that are backed by multiple memory pages. This base class contains all
@@ -33,29 +32,29 @@ import org.apache.flink.core.memory.MemorySegment;
* implement the methods to provide the next memory page once the boundary is crossed.
*/
public abstract class AbstractPagedInputView implements DataInputView {
-
+
private MemorySegment currentSegment;
-
+
protected final int headerLength; // the number of bytes to skip at the beginning of each segment
-
+
private int positionInSegment; // the offset in the current segment
-
+
private int limitInSegment; // the limit in the current segment before switching to the next
-
+
private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-
-
+
+
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
-
+
/**
* Creates a new view that starts with the given segment. The input starts directly after the header
* of the given page. If the header size is zero, it starts at the beginning. The specified initial
* limit describes up to which position data may be read from the current segment, before the view must
* advance to the next segment.
- *
+ *
* @param initialSegment The memory segment to start reading from.
* @param initialLimit The position one after the last valid byte in the initial segment.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header. This
@@ -66,14 +65,14 @@ public abstract class AbstractPagedInputView implements DataInputView {
this.positionInSegment = headerLength;
seekInput(initialSegment, headerLength, initialLimit);
}
-
+
/**
* Creates a new view that is initially not bound to a memory segment. This constructor is typically
* for views that always seek first.
- * <p>
- * WARNING: The view is not readable until the first call to either {@link #advance()},
+ *
+ * <p>WARNING: The view is not readable until the first call to either {@link #advance()},
* or to {@link #seekInput(MemorySegment, int, int)}.
- *
+ *
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
protected AbstractPagedInputView(int headerLength) {
@@ -83,73 +82,73 @@ public abstract class AbstractPagedInputView implements DataInputView {
// --------------------------------------------------------------------------------------------
// Page Management
// --------------------------------------------------------------------------------------------
-
+
/**
* Gets the memory segment that will be used to read the next bytes from. If the segment is exactly exhausted,
* meaning that the last byte read was the last byte available in the segment, then this segment will
* not serve the next bytes. The segment to serve the next bytes will be obtained through the
* {@link #nextSegment(MemorySegment)} method.
- *
+ *
* @return The current memory segment.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}
-
+
/**
* Gets the position from which the next byte will be read. If that position is equal to the current limit,
* then the next byte will be read from next segment.
- *
+ *
* @return The position from which the next byte will be read.
* @see #getCurrentSegmentLimit()
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}
-
+
/**
* Gets the current limit in the memory segment. This value points to the byte one after the last valid byte
* in the memory segment.
- *
+ *
* @return The current limit in the memory segment.
* @see #getCurrentPositionInSegment()
*/
public int getCurrentSegmentLimit() {
return this.limitInSegment;
}
-
+
/**
* The method by which concrete subclasses realize page crossing. This method is invoked when the current page
* is exhausted and a new page is required to continue the reading. If no further page is available, this
* method must throw an {@link EOFException}.
- *
+ *
* @param current The current page that was read to its limit. May be {@code null}, if this method is
* invoked for the first time.
* @return The next page from which the reading should continue. May not be {@code null}. If the input is
* exhausted, an {@link EOFException} must be thrown instead.
- *
+ *
* @throws EOFException Thrown, if no further segment is available.
* @throws IOException Thrown, if the method cannot provide the next page due to an I/O related problem.
*/
protected abstract MemorySegment nextSegment(MemorySegment current) throws EOFException, IOException;
-
+
/**
* Gets the limit for reading bytes from the given memory segment. This method must return the position
* of the byte after the last valid byte in the given memory segment. When the position returned by this
* method is reached, the view will attempt to switch to the next memory segment.
- *
+ *
* @param segment The segment to determine the limit for.
* @return The limit for the given memory segment.
*/
protected abstract int getLimitForSegment(MemorySegment segment);
-
+
/**
* Advances the view to the next memory segment. The reading will continue after the header of the next
* segment. This method uses {@link #nextSegment(MemorySegment)} and {@link #getLimitForSegment(MemorySegment)}
* to get the next segment and set its limit.
- *
+ *
* @throws IOException Thrown, if the next segment could not be obtained.
- *
+ *
* @see #nextSegment(MemorySegment)
* @see #getLimitForSegment(MemorySegment)
*/
@@ -160,11 +159,11 @@ public abstract class AbstractPagedInputView implements DataInputView {
this.limitInSegment = getLimitForSegment(this.currentSegment);
this.positionInSegment = this.headerLength;
}
-
+
/**
* Sets the internal state of the view such that the next bytes will be read from the given memory segment,
* starting at the given position. The memory segment will provide bytes up to the given limit position.
- *
+ *
* @param segment The segment to read the next bytes from.
* @param positionInSegment The position in the segment to start reading from.
* @param limitInSegment The limit in the segment. When reached, the view will attempt to switch to
@@ -175,7 +174,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
this.positionInSegment = positionInSegment;
this.limitInSegment = limitInSegment;
}
-
+
/**
* Clears the internal state of the view. After this call, all read attempts will fail, until the
* {@link #advance()} or {@link #seekInput(MemorySegment, int, int)} method have been invoked.
@@ -185,14 +184,14 @@ public abstract class AbstractPagedInputView implements DataInputView {
this.positionInSegment = this.headerLength;
this.limitInSegment = headerLength;
}
-
+
// --------------------------------------------------------------------------------------------
// Data Input Specific methods
// --------------------------------------------------------------------------------------------
@Override
public int read(byte[] b) throws IOException{
- return read(b,0,b.length);
+ return read(b, 0, b.length);
}
@Override
@@ -220,7 +219,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
int bytesRead = 0;
while (true) {
- int toRead = Math.min(remaining, len-bytesRead);
+ int toRead = Math.min(remaining, len - bytesRead);
this.currentSegment.get(this.positionInSegment, b, off, toRead);
off += toRead;
bytesRead += toRead;
@@ -243,7 +242,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
return len;
}
}
-
+
@Override
public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
@@ -251,9 +250,9 @@ public abstract class AbstractPagedInputView implements DataInputView {
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
- int bytesRead = read(b,off,len);
+ int bytesRead = read(b, off, len);
- if(bytesRead < len){
+ if (bytesRead < len){
throw new EOFException("There is no enough data left in the DataInputView.");
}
}
@@ -384,7 +383,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
@Override
public String readLine() throws IOException {
final StringBuilder bld = new StringBuilder(32);
-
+
try {
int b;
while ((b = readUnsignedByte()) != '\n') {
@@ -398,7 +397,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
if (bld.length() == 0) {
return null;
}
-
+
// trim a trailing carriage return
int len = bld.length();
if (len > 0 && bld.charAt(len - 1) == '\r') {
@@ -410,10 +409,10 @@ public abstract class AbstractPagedInputView implements DataInputView {
@Override
public String readUTF() throws IOException {
final int utflen = readUnsignedShort();
-
+
final byte[] bytearr;
final char[] chararr;
-
+
if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
bytearr = new byte[utflen];
this.utfByteBuffer = bytearr;
@@ -429,7 +428,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
int c, char2, char3;
int count = 0;
- int chararr_count = 0;
+ int chararrCount = 0;
readFully(bytearr, 0, utflen);
@@ -439,7 +438,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
break;
}
count++;
- chararr[chararr_count++] = (char) c;
+ chararr[chararrCount++] = (char) c;
}
while (count < utflen) {
@@ -455,7 +454,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
case 7:
/* 0xxxxxxx */
count++;
- chararr[chararr_count++] = (char) c;
+ chararr[chararrCount++] = (char) c;
break;
case 12:
case 13:
@@ -468,7 +467,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
}
- chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
@@ -481,7 +480,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
}
- chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
break;
default:
/* 10xx xxxx, 1111 xxxx */
@@ -489,15 +488,15 @@ public abstract class AbstractPagedInputView implements DataInputView {
}
}
// The number of chars produced may be less than utflen
- return new String(chararr, 0, chararr_count);
+ return new String(chararr, 0, chararrCount);
}
-
+
@Override
public int skipBytes(int n) throws IOException {
if (n < 0) {
throw new IllegalArgumentException();
}
-
+
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= n) {
this.positionInSegment += n;
@@ -512,20 +511,20 @@ public abstract class AbstractPagedInputView implements DataInputView {
}
remaining = this.limitInSegment - this.positionInSegment;
}
-
+
int skipped = 0;
while (true) {
int toSkip = Math.min(remaining, n);
n -= toSkip;
skipped += toSkip;
-
+
if (n > 0) {
try {
advance();
} catch (EOFException eofex) {
return skipped;
}
- remaining = this.limitInSegment - this.positionInSegment;
+ remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toSkip;
@@ -541,7 +540,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
if (numBytes < 0) {
throw new IllegalArgumentException();
}
-
+
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= numBytes) {
this.positionInSegment += numBytes;
@@ -551,12 +550,12 @@ public abstract class AbstractPagedInputView implements DataInputView {
advance();
remaining = this.limitInSegment - this.positionInSegment;
}
-
+
while (true) {
if (numBytes > remaining) {
numBytes -= remaining;
advance();
- remaining = this.limitInSegment - this.positionInSegment;
+ remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += numBytes;
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
index 482d82e..ddb909f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
@@ -16,46 +16,45 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.memory;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+
/**
* The base class for all output views that are backed by multiple memory pages. This base class contains all
* encoding methods to write data to a page and detect page boundary crossing. The concrete sub classes must
* implement the methods to collect the current page and provide the next memory page once the boundary is crossed.
- * <p>
- * The paging assumes that all memory segments are of the same size.
+ *
+ * <p>The paging assumes that all memory segments are of the same size.
*/
public abstract class AbstractPagedOutputView implements DataOutputView {
-
+
private MemorySegment currentSegment; // the current memory segment to write to
-
+
protected final int segmentSize; // the size of the memory segments
-
+
protected final int headerLength; // the number of bytes to skip at the beginning of each segment
-
+
private int positionInSegment; // the offset in the current segment
-
+
private byte[] utfBuffer; // the reusable array for UTF encodings
-
-
+
+
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
-
+
/**
* Creates a new output view that writes initially to the given initial segment. All segments in the
* view have to be of the given {@code segmentSize}. A header of length {@code headerLength} is left
* at the beginning of each segment.
- *
+ *
* @param initialSegment The segment that the view starts writing to.
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
@@ -69,81 +68,79 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
this.currentSegment = initialSegment;
this.positionInSegment = headerLength;
}
-
+
/**
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
- protected AbstractPagedOutputView(int segmentSize, int headerLength)
- {
+ protected AbstractPagedOutputView(int segmentSize, int headerLength) {
this.segmentSize = segmentSize;
this.headerLength = headerLength;
}
-
+
// --------------------------------------------------------------------------------------------
// Page Management
// --------------------------------------------------------------------------------------------
-
+
/**
- *
* This method must return a segment. If no more segments are available, it must throw an
* {@link java.io.EOFException}.
- *
+ *
* @param current The current memory segment
* @param positionInCurrent The position in the segment, one after the last valid byte.
- * @return The next memory segment.
- *
+ * @return The next memory segment.
+ *
* @throws IOException
*/
protected abstract MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException;
-
-
+
+
/**
* Gets the segment that the view currently writes to.
- *
+ *
* @return The segment the view currently writes to.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}
-
+
/**
* Gets the current write position (the position where the next bytes will be written)
* in the current memory segment.
- *
+ *
* @return The current write offset in the current memory segment.
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}
-
+
/**
* Gets the size of the segments used by this view.
- *
+ *
* @return The memory segment size.
*/
public int getSegmentSize() {
return this.segmentSize;
}
-
+
/**
* Moves the output view to the next page. This method invokes internally the
- * {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass'
+ * {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass'
* implementation and obtain the next segment to write to. Writing will continue inside the new segment
* after the header.
- *
+ *
* @throws IOException Thrown, if the current segment could not be processed or a new segment could not
- * be obtained.
+ * be obtained.
*/
protected void advance() throws IOException {
this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
this.positionInSegment = this.headerLength;
}
-
+
/**
- * Sets the internal state to the given memory segment and the given position within the segment.
- *
+ * Sets the internal state to the given memory segment and the given position within the segment.
+ *
* @param seg The memory segment to write the next bytes to.
* @param position The position to start writing the next bytes to.
*/
@@ -151,11 +148,11 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
this.currentSegment = seg;
this.positionInSegment = position;
}
-
+
/**
* Clears the internal state. Any successive write calls will fail until either {@link #advance()} or
- * {@link #seekOutput(MemorySegment, int)} is called.
- *
+ * {@link #seekOutput(MemorySegment, int)} is called.
+ *
* @see #advance()
* @see #seekOutput(MemorySegment, int)
*/
@@ -163,11 +160,11 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
this.currentSegment = null;
this.positionInSegment = this.headerLength;
}
-
+
// --------------------------------------------------------------------------------------------
// Data Output Specific methods
// --------------------------------------------------------------------------------------------
-
+
@Override
public void write(int b) throws IOException {
writeByte(b);
@@ -195,11 +192,11 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
this.currentSegment.put(this.positionInSegment, b, off, toPut);
off += toPut;
len -= toPut;
-
+
if (len > 0) {
this.positionInSegment = this.segmentSize;
advance();
- remaining = this.segmentSize - this.positionInSegment;
+ remaining = this.segmentSize - this.positionInSegment;
}
else {
this.positionInSegment += toPut;
@@ -349,7 +346,7 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
final byte[] bytearr = this.utfBuffer;
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
- bytearr[count++] = (byte) ( utflen & 0xFF);
+ bytearr[count++] = (byte) (utflen & 0xFF);
int i;
for (i = 0; i < strlen; i++) {
@@ -368,10 +365,10 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- bytearr[count++] = (byte) (0x80 | ( c & 0x3F));
+ bytearr[count++] = (byte) (0x80 | (c & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- bytearr[count++] = (byte) (0x80 | ( c & 0x3F));
+ bytearr[count++] = (byte) (0x80 | (c & 0x3F));
}
}
@@ -401,13 +398,13 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
this.positionInSegment += numBytes;
return;
}
-
+
if (remaining > 0) {
this.currentSegment.put(source, this.positionInSegment, remaining);
this.positionInSegment = this.segmentSize;
numBytes -= remaining;
}
-
+
advance();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
index ebeb47d..454e374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
@@ -16,26 +16,23 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.memory;
-import java.util.List;
-
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentSource;
+import java.util.List;
+
/**
* Simple memory segment source that draws segments from a list.
- *
+ *
*/
-public class ListMemorySegmentSource implements MemorySegmentSource
-{
+public class ListMemorySegmentSource implements MemorySegmentSource {
private final List<MemorySegment> segments;
-
+
public ListMemorySegmentSource(final List<MemorySegment> memorySegments) {
this.segments = memorySegments;
}
-
@Override
public MemorySegment nextSegment() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
index 112d6f7..5c9708b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.memory;
/**
* An exception to be thrown when a memory allocation operation is not successful.
*/
public class MemoryAllocationException extends Exception {
-
+
private static final long serialVersionUID = -403983866457947012L;
public MemoryAllocationException() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 81d410c..e53237a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -18,6 +18,15 @@
package org.apache.flink.runtime.memory;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.util.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -29,24 +38,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* The memory manager governs the memory that Flink uses for sorting, hashing, and caching. Memory
* is represented in segments of equal size. Operators allocate the memory by requesting a number
* of memory segments.
- * <p>
- * The memory may be represented as on-heap byte arrays ({@link HeapMemorySegment}), or as off-heap
+ *
+ * <p>The memory may be represented as on-heap byte arrays ({@link HeapMemorySegment}), or as off-heap
* memory regions ({@link HybridMemorySegment}). Which kind of memory the MemoryManager serves can
* be passed as an argument to the initialization.
- * <p>
- * The memory manager can either pre-allocate all memory, or allocate the memory on demand. In the
+ *
+ * <p>The memory manager can either pre-allocate all memory, or allocate the memory on demand. In the
* former version, memory will be occupied and reserved from start on, which means that no OutOfMemoryError
* can come while requesting memory. Released memory will also return to the MemoryManager's pool.
* On-demand allocation means that the memory manager only keeps track how many memory segments are
@@ -69,35 +70,35 @@ public class MemoryManager {
/** The memory pool from which we draw memory segments. Specific to on-heap or off-heap memory */
private final MemoryPool memoryPool;
-
- /** Memory segments allocated per memory owner */
+
+ /** Memory segments allocated per memory owner. */
private final HashMap<Object, Set<MemorySegment>> allocatedSegments;
- /** The type of memory governed by this memory manager */
+ /** The type of memory governed by this memory manager. */
private final MemoryType memoryType;
-
- /** mask used to round down sizes to multiples of the page size */
+
+ /** Mask used to round down sizes to multiples of the page size. */
private final long roundingMask;
- /** The size of the memory segments */
+ /** The size of the memory segments. */
private final int pageSize;
/** The initial total size, for verification. */
private final int totalNumPages;
- /** The total size of the memory managed by this memory manager */
+ /** The total size of the memory managed by this memory manager. */
private final long memorySize;
- /** Number of slots of the task manager */
+ /** Number of slots of the task manager. */
private final int numberOfSlots;
- /** Flag marking whether the memory manager immediately allocates the memory */
+ /** Flag marking whether the memory manager immediately allocates the memory. */
private final boolean isPreAllocated;
- /** The number of memory pages that have not been allocated and are available for lazy allocation */
+ /** The number of memory pages that have not been allocated and are available for lazy allocation. */
private int numNonAllocatedPages;
- /** flag whether the close() has already been invoked */
+ /** Flag whether the close() has already been invoked. */
private boolean isShutDown;
@@ -160,13 +161,13 @@ public class MemoryManager {
this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;
-
+
switch (memoryType) {
case HEAP:
this.memoryPool = new HeapMemoryPool(memToAllocate, pageSize);
break;
case OFF_HEAP:
- if(!preAllocateMemory) {
+ if (!preAllocateMemory) {
LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
" the memory type 'taskmanager.memory.off-heap' is set to true.");
}
@@ -189,8 +190,7 @@ public class MemoryManager {
*/
public void shutdown() {
// -------------------- BEGIN CRITICAL SECTION -------------------
- synchronized (lock)
- {
+ synchronized (lock) {
if (!isShutDown) {
// mark as shutdown and release memory
isShutDown = true;
@@ -202,7 +202,7 @@ public class MemoryManager {
seg.free();
}
}
-
+
memoryPool.clear();
}
}
@@ -242,7 +242,7 @@ public class MemoryManager {
*
* @param owner The owner to associate with the memory segment, for the fallback release.
* @param numPages The number of pages to allocate.
- * @return A list with the memory segments.
+ * @return A list with the memory segments.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
*/
@@ -256,7 +256,7 @@ public class MemoryManager {
* Allocates a set of memory segments from this memory manager. If the memory manager pre-allocated the
* segments, they will be taken from the pool of memory segments. Otherwise, they will be allocated
* as part of this call.
- *
+ *
* @param owner The owner to associate with the memory segment, for the fallback release.
* @param target The list into which to put the allocated memory pages.
* @param numPages The number of pages to allocate.
@@ -264,8 +264,7 @@ public class MemoryManager {
* of memory pages any more.
*/
public void allocatePages(Object owner, List<MemorySegment> target, int numPages)
- throws MemoryAllocationException
- {
+ throws MemoryAllocationException {
// sanity check
if (owner == null) {
throw new IllegalArgumentException("The memory owner must not be null.");
@@ -277,8 +276,7 @@ public class MemoryManager {
}
// -------------------- BEGIN CRITICAL SECTION -------------------
- synchronized (lock)
- {
+ synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
@@ -319,8 +317,8 @@ public class MemoryManager {
/**
* Tries to release the memory for the specified segment. If the segment has already been released or
* is null, the request is simply ignored.
- * <p>
- * If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
+ *
+ * <p>If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
* Otherwise, the segment is only freed and made eligible for reclamation by the GC.
*
* @param segment The segment to be released.
@@ -333,10 +331,9 @@ public class MemoryManager {
}
final Object owner = segment.getOwner();
-
+
// -------------------- BEGIN CRITICAL SECTION -------------------
- synchronized (lock)
- {
+ synchronized (lock) {
// prevent double return to this memory manager
if (segment.isFreed()) {
return;
@@ -374,10 +371,10 @@ public class MemoryManager {
/**
* Tries to release many memory segments together.
- * <p>
- * If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
+ *
+ * <p>If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
* Otherwise, the segment is only freed and made eligible for reclamation by the GC.
- *
+ *
* @param segments The segments to be released.
* @throws NullPointerException Thrown, if the given collection is null.
* @throws IllegalArgumentException Thrown, id the segments are of an incompatible type.
@@ -388,8 +385,7 @@ public class MemoryManager {
}
// -------------------- BEGIN CRITICAL SECTION -------------------
- synchronized (lock)
- {
+ synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
@@ -459,7 +455,7 @@ public class MemoryManager {
}
/**
- * Releases all memory segments for the given owner.
+ * Releases all memory segments for the given owner.
*
* @param owner The owner memory segments are to be released.
*/
@@ -469,8 +465,7 @@ public class MemoryManager {
}
// -------------------- BEGIN CRITICAL SECTION -------------------
- synchronized (lock)
- {
+ synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
@@ -507,7 +502,7 @@ public class MemoryManager {
/**
* Gets the type of memory (heap / off-heap) managed by this memory manager.
- *
+ *
* @return The type of memory managed by this memory manager.
*/
public MemoryType getMemoryType() {
@@ -556,14 +551,14 @@ public class MemoryManager {
* than the page size) is not included.
*
* @param fraction the fraction of the total memory per slot
- * @return The number of pages to which
+ * @return The number of pages to which
*/
public int computeNumberOfPages(double fraction) {
if (fraction <= 0 || fraction > 1) {
throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
}
- return (int)(totalNumPages * fraction / numberOfSlots);
+ return (int) (totalNumPages * fraction / numberOfSlots);
}
/**
@@ -584,13 +579,13 @@ public class MemoryManager {
public long roundDownToPageSizeMultiple(long numBytes) {
return numBytes & roundingMask;
}
-
+
// ------------------------------------------------------------------------
// Memory Pools
// ------------------------------------------------------------------------
- static abstract class MemoryPool {
+ abstract static class MemoryPool {
abstract int getNumberOfAvailableMemorySegments();
@@ -599,13 +594,13 @@ public class MemoryManager {
abstract MemorySegment requestSegmentFromPool(Object owner);
abstract void returnSegmentToPool(MemorySegment segment);
-
+
abstract void clear();
}
static final class HeapMemoryPool extends MemoryPool {
- /** The collection of available memory segments */
+ /** The collection of available memory segments. */
private final ArrayDeque<byte[]> availableMemory;
private final int segmentSize;
@@ -613,7 +608,7 @@ public class MemoryManager {
public HeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<byte[]>(numInitialSegments);
this.segmentSize = segmentSize;
-
+
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(new byte[segmentSize]);
}
@@ -652,10 +647,10 @@ public class MemoryManager {
availableMemory.clear();
}
}
-
+
static final class HybridOffHeapMemoryPool extends MemoryPool {
- /** The collection of available memory segments */
+ /** The collection of available memory segments. */
private final ArrayDeque<ByteBuffer> availableMemory;
private final int segmentSize;
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
index d1063cc..dac044e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
@@ -20,15 +20,18 @@ package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
+
import org.junit.Test;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.fail;
+/**
+ * Validate memory release under concurrent modification exceptions.
+ */
public class MemoryManagerConcurrentModReleaseTest {
@Test
@@ -49,33 +52,33 @@ public class MemoryManagerConcurrentModReleaseTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testConcurrentModificationWhileReleasing() {
try {
final int numSegments = 10000;
final int segmentSize = 4096;
-
+
MemoryManager memMan = new MemoryManager(numSegments * segmentSize, 1, segmentSize, MemoryType.HEAP, true);
-
+
ArrayList<MemorySegment> segs = new ArrayList<>(numSegments);
memMan.allocatePages(this, segs, numSegments);
-
+
// start a thread that performs concurrent modifications
Modifier mod = new Modifier(segs);
Thread modRunner = new Thread(mod);
modRunner.start();
-
+
// give the thread some time to start working
Thread.sleep(500);
-
+
try {
memMan.release(segs);
}
finally {
mod.cancel();
}
-
+
modRunner.join();
}
catch (Exception e) {
@@ -83,14 +86,13 @@ public class MemoryManagerConcurrentModReleaseTest {
fail(e.getMessage());
}
}
-
+
private class Modifier implements Runnable {
-
+
private final ArrayList<MemorySegment> toModify;
-
+
private volatile boolean running = true;
-
private Modifier(ArrayList<MemorySegment> toModify) {
this.toModify = toModify;
}
@@ -112,13 +114,13 @@ public class MemoryManagerConcurrentModReleaseTest {
}
}
}
-
+
private class ListWithConcModExceptionOnFirstAccess<E> extends ArrayList<E> {
private static final long serialVersionUID = -1623249699823349781L;
-
+
private boolean returnedIterator;
-
+
@Override
public Iterator<E> iterator() {
if (returnedIterator) {
@@ -130,8 +132,7 @@ public class MemoryManagerConcurrentModReleaseTest {
}
}
}
-
-
+
private class ConcFailingIterator<E> implements Iterator<E> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
index 15251e9..46d0851 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -38,20 +38,19 @@ import static org.junit.Assert.fail;
* Tests for the memory manager, in the mode where it pre-allocates all memory.
*/
public class MemoryManagerLazyAllocationTest {
-
+
private static final long RANDOM_SEED = 643196033469871L;
private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-
+
private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
private MemoryManager memoryManager;
private Random random;
-
@Before
public void setUp() {
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, false);
@@ -72,7 +71,7 @@ public class MemoryManagerLazyAllocationTest {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segments = new ArrayList<MemorySegment>();
-
+
try {
for (int i = 0; i < NUM_PAGES; i++) {
segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
@@ -81,7 +80,7 @@ public class MemoryManagerLazyAllocationTest {
catch (MemoryAllocationException e) {
fail("Unable to allocate memory");
}
-
+
for (MemorySegment seg : segments) {
this.memoryManager.release(seg);
}
@@ -91,21 +90,21 @@ public class MemoryManagerLazyAllocationTest {
fail(e.getMessage());
}
}
-
+
@Test
public void allocateAllMulti() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-
+
try {
- for(int i = 0; i < NUM_PAGES / 2; i++) {
+ for (int i = 0; i < NUM_PAGES / 2; i++) {
segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
}
} catch (MemoryAllocationException e) {
Assert.fail("Unable to allocate memory");
}
-
+
this.memoryManager.release(segments);
}
catch (Exception e) {
@@ -113,37 +112,37 @@ public class MemoryManagerLazyAllocationTest {
fail(e.getMessage());
}
}
-
+
@Test
public void allocateMultipleOwners() {
- final int NUM_OWNERS = 17;
-
+ final int numOwners = 17;
+
try {
- AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
-
+ AbstractInvokable[] owners = new AbstractInvokable[numOwners];
+
@SuppressWarnings("unchecked")
- List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
-
- for (int i = 0; i < NUM_OWNERS; i++) {
+ List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
+
+ for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
-
+
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
- final int owner = this.random.nextInt(NUM_OWNERS);
+ final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
-
+
// free one owner at a time
- for (int i = 0; i < NUM_OWNERS; i++) {
+ for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
-
+
// check that the owner owners were not affected
- for (int k = i+1; k < NUM_OWNERS; k++) {
+ for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
@@ -153,24 +152,24 @@ public class MemoryManagerLazyAllocationTest {
fail(e.getMessage());
}
}
-
+
@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
-
+
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-
+
try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}
-
+
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
-
+
this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
@@ -178,7 +177,7 @@ public class MemoryManagerLazyAllocationTest {
fail(e.getMessage());
}
}
-
+
private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (seg.isFreed()) {
@@ -187,7 +186,7 @@ public class MemoryManagerLazyAllocationTest {
}
return true;
}
-
+
private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (!seg.isFreed()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index a20a180..6847456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -18,40 +18,39 @@
package org.apache.flink.runtime.memory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.junit.Assert;
+
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
/**
* Tests for the memory manager, in the mode where it pre-allocates all memory.
*/
public class MemoryManagerTest {
-
+
private static final long RANDOM_SEED = 643196033469871L;
private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-
+
private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
private MemoryManager memoryManager;
private Random random;
-
@Before
public void setUp() {
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
@@ -72,7 +71,7 @@ public class MemoryManagerTest {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segments = new ArrayList<MemorySegment>();
-
+
try {
for (int i = 0; i < NUM_PAGES; i++) {
segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
@@ -81,7 +80,7 @@ public class MemoryManagerTest {
catch (MemoryAllocationException e) {
fail("Unable to allocate memory");
}
-
+
this.memoryManager.release(segments);
}
catch (Exception e) {
@@ -89,21 +88,21 @@ public class MemoryManagerTest {
fail(e.getMessage());
}
}
-
+
@Test
public void allocateAllMulti() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-
+
try {
- for(int i = 0; i < NUM_PAGES / 2; i++) {
+ for (int i = 0; i < NUM_PAGES / 2; i++) {
segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
}
} catch (MemoryAllocationException e) {
Assert.fail("Unable to allocate memory");
}
-
+
this.memoryManager.release(segments);
}
catch (Exception e) {
@@ -111,37 +110,37 @@ public class MemoryManagerTest {
fail(e.getMessage());
}
}
-
+
@Test
public void allocateMultipleOwners() {
- final int NUM_OWNERS = 17;
-
+ final int numOwners = 17;
+
try {
- AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
-
+ AbstractInvokable[] owners = new AbstractInvokable[numOwners];
+
@SuppressWarnings("unchecked")
- List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
-
- for (int i = 0; i < NUM_OWNERS; i++) {
+ List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
+
+ for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
-
+
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
- final int owner = this.random.nextInt(NUM_OWNERS);
+ final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
-
+
// free one owner at a time
- for (int i = 0; i < NUM_OWNERS; i++) {
+ for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
-
+
// check that the owner owners were not affected
- for (int k = i+1; k < NUM_OWNERS; k++) {
+ for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
@@ -151,24 +150,24 @@ public class MemoryManagerTest {
fail(e.getMessage());
}
}
-
+
@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
-
+
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-
+
try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}
-
+
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
-
+
this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
@@ -176,7 +175,7 @@ public class MemoryManagerTest {
fail(e.getMessage());
}
}
-
+
private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (seg.isFreed()) {
@@ -185,7 +184,7 @@ public class MemoryManagerTest {
}
return true;
}
-
+
private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
for (MemorySegment seg : memSegs) {
if (!seg.isFreed()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
index fad1b0e..3a07b44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -18,26 +18,29 @@
package org.apache.flink.runtime.memory;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.Random;
-
+import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.core.memory.MemorySegment;
-import org.junit.Assert;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test reading and writing primitive types to {@link MemorySegment}.
+ */
public class MemorySegmentSimpleTest {
-
+
public static final long RANDOM_SEED = 643196033469871L;
public static final int MANAGED_MEMORY_SIZE = 1024 * 1024 * 16;
@@ -67,7 +70,7 @@ public class MemorySegmentSimpleTest {
this.manager.release(this.segment);
this.random = null;
this.segment = null;
-
+
if (!this.manager.verifyEmpty()) {
Assert.fail("Not all memory has been properly released.");
}
@@ -430,7 +433,7 @@ public class MemorySegmentSimpleTest {
assertEquals(random.nextLong(), segment.getLong(i));
}
}
-
+
// test unaligned offsets
{
final long seed = random.nextLong();
@@ -440,7 +443,7 @@ public class MemorySegmentSimpleTest {
long value = random.nextLong();
segment.putLong(offset, value);
}
-
+
random.setSeed(seed);
for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
long shouldValue = random.nextLong();
@@ -547,22 +550,22 @@ public class MemorySegmentSimpleTest {
}
}
}
-
+
@Test
public void testByteBufferWrapping() {
try {
MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
-
+
ByteBuffer buf1 = seg.wrap(13, 47);
assertEquals(13, buf1.position());
assertEquals(60, buf1.limit());
assertEquals(47, buf1.remaining());
-
+
ByteBuffer buf2 = seg.wrap(500, 267);
assertEquals(500, buf2.position());
assertEquals(767, buf2.limit());
assertEquals(267, buf2.remaining());
-
+
ByteBuffer buf3 = seg.wrap(0, 1024);
assertEquals(0, buf3.position());
assertEquals(1024, buf3.limit());