You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/05 20:53:07 UTC

[2/6] flink git commit: [FLINK-6879] [runtime] Activate checkstyle for runtime/memory

[FLINK-6879] [runtime] Activate checkstyle for runtime/memory

This closes #4097


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c150a6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c150a6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c150a6f

Branch: refs/heads/master
Commit: 7c150a6fc25962f2229ef1de256b668fa640a9b5
Parents: 2a3ab2b
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 12:29:35 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 5 15:22:48 2017 -0400

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |   1 -
 .../runtime/memory/AbstractPagedInputView.java  | 115 +++++++++----------
 .../runtime/memory/AbstractPagedOutputView.java |  97 ++++++++--------
 .../runtime/memory/ListMemorySegmentSource.java |  13 +--
 .../memory/MemoryAllocationException.java       |   3 +-
 .../flink/runtime/memory/MemoryManager.java     | 109 +++++++++---------
 .../MemoryManagerConcurrentModReleaseTest.java  |  37 +++---
 .../memory/MemoryManagerLazyAllocationTest.java |  59 +++++-----
 .../flink/runtime/memory/MemoryManagerTest.java |  73 ++++++------
 .../runtime/memory/MemorySegmentSimpleTest.java |  39 ++++---
 10 files changed, 267 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 15b502e..0ee99e4 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -443,7 +443,6 @@ under the License.
 						**/runtime/jobmanager/**,
 						**/runtime/jobmaster/**,
 						**/runtime/leaderelection/**,
-						**/runtime/memory/**,
 						**/runtime/messages/**,
 						**/runtime/minicluster/**,
 						**/runtime/operators/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
index 788eebb..0536e1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.memory;
 
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
 
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
-
 
 /**
  * The base class for all input views that are backed by multiple memory pages. This base class contains all
@@ -33,29 +32,29 @@ import org.apache.flink.core.memory.MemorySegment;
  * implement the methods to provide the next memory page once the boundary is crossed.
  */
 public abstract class AbstractPagedInputView implements DataInputView {
-	
+
 	private MemorySegment currentSegment;
-	
+
 	protected final int headerLength;				// the number of bytes to skip at the beginning of each segment
-	
+
 	private int positionInSegment;					// the offset in the current segment
-	
+
 	private int limitInSegment;						// the limit in the current segment before switching to the next
-	
+
 	private byte[] utfByteBuffer;					// reusable byte buffer for utf-8 decoding
 	private char[] utfCharBuffer;					// reusable char buffer for utf-8 decoding
-	
-	
+
+
 	// --------------------------------------------------------------------------------------------
 	//                                    Constructors
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a new view that starts with the given segment. The input starts directly after the header
 	 * of the given page. If the header size is zero, it starts at the beginning. The specified initial
 	 * limit describes up to which position data may be read from the current segment, before the view must
 	 * advance to the next segment.
-	 * 
+	 *
 	 * @param initialSegment The memory segment to start reading from.
 	 * @param initialLimit The position one after the last valid byte in the initial segment.
 	 * @param headerLength The number of bytes to skip at the beginning of each segment for the header. This
@@ -66,14 +65,14 @@ public abstract class AbstractPagedInputView implements DataInputView {
 		this.positionInSegment = headerLength;
 		seekInput(initialSegment, headerLength, initialLimit);
 	}
-	
+
 	/**
 	 * Creates a new view that is initially not bound to a memory segment. This constructor is typically
 	 * for views that always seek first.
-	 * <p>
-	 * WARNING: The view is not readable until the first call to either {@link #advance()}, 
+	 *
+	 * <p>WARNING: The view is not readable until the first call to either {@link #advance()},
 	 * or to {@link #seekInput(MemorySegment, int, int)}.
-	 * 
+	 *
 	 * @param headerLength The number of bytes to skip at the beginning of each segment for the header.
 	 */
 	protected AbstractPagedInputView(int headerLength) {
@@ -83,73 +82,73 @@ public abstract class AbstractPagedInputView implements DataInputView {
 	// --------------------------------------------------------------------------------------------
 	//                                  Page Management
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the memory segment that will be used to read the next bytes from. If the segment is exactly exhausted,
 	 * meaning that the last byte read was the last byte available in the segment, then this segment will
 	 * not serve the next bytes. The segment to serve the next bytes will be obtained through the
 	 * {@link #nextSegment(MemorySegment)} method.
-	 * 
+	 *
 	 * @return The current memory segment.
 	 */
 	public MemorySegment getCurrentSegment() {
 		return this.currentSegment;
 	}
-	
+
 	/**
 	 * Gets the position from which the next byte will be read. If that position is equal to the current limit,
 	 * then the next byte will be read from next segment.
-	 * 
+	 *
 	 * @return The position from which the next byte will be read.
 	 * @see #getCurrentSegmentLimit()
 	 */
 	public int getCurrentPositionInSegment() {
 		return this.positionInSegment;
 	}
-	
+
 	/**
 	 * Gets the current limit in the memory segment. This value points to the byte one after the last valid byte
 	 * in the memory segment.
-	 * 
+	 *
 	 * @return The current limit in the memory segment.
 	 * @see #getCurrentPositionInSegment()
 	 */
 	public int getCurrentSegmentLimit() {
 		return this.limitInSegment;
 	}
-	
+
 	/**
 	 * The method by which concrete subclasses realize page crossing. This method is invoked when the current page
 	 * is exhausted and a new page is required to continue the reading. If no further page is available, this
 	 * method must throw an {@link EOFException}.
-	 *  
+	 *
 	 * @param current The current page that was read to its limit. May be {@code null}, if this method is
 	 *                invoked for the first time.
 	 * @return The next page from which the reading should continue. May not be {@code null}. If the input is
 	 *         exhausted, an {@link EOFException} must be thrown instead.
-	 *         
+	 *
 	 * @throws EOFException Thrown, if no further segment is available.
 	 * @throws IOException Thrown, if the method cannot provide the next page due to an I/O related problem.
 	 */
 	protected abstract MemorySegment nextSegment(MemorySegment current) throws EOFException, IOException;
-	
+
 	/**
 	 * Gets the limit for reading bytes from the given memory segment. This method must return the position
 	 * of the byte after the last valid byte in the given memory segment. When the position returned by this
 	 * method is reached, the view will attempt to switch to the next memory segment.
-	 * 
+	 *
 	 * @param segment The segment to determine the limit for.
 	 * @return The limit for the given memory segment.
 	 */
 	protected abstract int getLimitForSegment(MemorySegment segment);
-	
+
 	/**
 	 * Advances the view to the next memory segment. The reading will continue after the header of the next
 	 * segment. This method uses {@link #nextSegment(MemorySegment)} and {@link #getLimitForSegment(MemorySegment)}
 	 * to get the next segment and set its limit.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the next segment could not be obtained.
-	 * 
+	 *
 	 * @see #nextSegment(MemorySegment)
 	 * @see #getLimitForSegment(MemorySegment)
 	 */
@@ -160,11 +159,11 @@ public abstract class AbstractPagedInputView implements DataInputView {
 		this.limitInSegment = getLimitForSegment(this.currentSegment);
 		this.positionInSegment = this.headerLength;
 	}
-	
+
 	/**
 	 * Sets the internal state of the view such that the next bytes will be read from the given memory segment,
 	 * starting at the given position. The memory segment will provide bytes up to the given limit position.
-	 * 
+	 *
 	 * @param segment The segment to read the next bytes from.
 	 * @param positionInSegment The position in the segment to start reading from.
 	 * @param limitInSegment The limit in the segment. When reached, the view will attempt to switch to
@@ -175,7 +174,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 		this.positionInSegment = positionInSegment;
 		this.limitInSegment = limitInSegment;
 	}
-	
+
 	/**
 	 * Clears the internal state of the view. After this call, all read attempts will fail, until the
 	 * {@link #advance()} or {@link #seekInput(MemorySegment, int, int)} method have been invoked.
@@ -185,14 +184,14 @@ public abstract class AbstractPagedInputView implements DataInputView {
 		this.positionInSegment = this.headerLength;
 		this.limitInSegment = headerLength;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//                               Data Input Specific methods
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public int read(byte[] b) throws IOException{
-		return read(b,0,b.length);
+		return read(b, 0, b.length);
 	}
 
 	@Override
@@ -220,7 +219,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 
 			int bytesRead = 0;
 			while (true) {
-				int toRead = Math.min(remaining, len-bytesRead);
+				int toRead = Math.min(remaining, len - bytesRead);
 				this.currentSegment.get(this.positionInSegment, b, off, toRead);
 				off += toRead;
 				bytesRead += toRead;
@@ -243,7 +242,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 			return len;
 		}
 	}
-	
+
 	@Override
 	public void readFully(byte[] b) throws IOException {
 		readFully(b, 0, b.length);
@@ -251,9 +250,9 @@ public abstract class AbstractPagedInputView implements DataInputView {
 
 	@Override
 	public void readFully(byte[] b, int off, int len) throws IOException {
-		int bytesRead = read(b,off,len);
+		int bytesRead = read(b, off, len);
 
-		if(bytesRead < len){
+		if (bytesRead < len){
 			throw new EOFException("There is no enough data left in the DataInputView.");
 		}
 	}
@@ -384,7 +383,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 	@Override
 	public String readLine() throws IOException {
 		final StringBuilder bld = new StringBuilder(32);
-		
+
 		try {
 			int b;
 			while ((b = readUnsignedByte()) != '\n') {
@@ -398,7 +397,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 		if (bld.length() == 0) {
 			return null;
 		}
-		
+
 		// trim a trailing carriage return
 		int len = bld.length();
 		if (len > 0 && bld.charAt(len - 1) == '\r') {
@@ -410,10 +409,10 @@ public abstract class AbstractPagedInputView implements DataInputView {
 	@Override
 	public String readUTF() throws IOException {
 		final int utflen = readUnsignedShort();
-		
+
 		final byte[] bytearr;
 		final char[] chararr;
-		
+
 		if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
 			bytearr = new byte[utflen];
 			this.utfByteBuffer = bytearr;
@@ -429,7 +428,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 
 		int c, char2, char3;
 		int count = 0;
-		int chararr_count = 0;
+		int chararrCount = 0;
 
 		readFully(bytearr, 0, utflen);
 
@@ -439,7 +438,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 				break;
 			}
 			count++;
-			chararr[chararr_count++] = (char) c;
+			chararr[chararrCount++] = (char) c;
 		}
 
 		while (count < utflen) {
@@ -455,7 +454,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 			case 7:
 				/* 0xxxxxxx */
 				count++;
-				chararr[chararr_count++] = (char) c;
+				chararr[chararrCount++] = (char) c;
 				break;
 			case 12:
 			case 13:
@@ -468,7 +467,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 				if ((char2 & 0xC0) != 0x80) {
 					throw new UTFDataFormatException("malformed input around byte " + count);
 				}
-				chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+				chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
 				break;
 			case 14:
 				/* 1110 xxxx 10xx xxxx 10xx xxxx */
@@ -481,7 +480,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 				}
-				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+				chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
 				break;
 			default:
 				/* 10xx xxxx, 1111 xxxx */
@@ -489,15 +488,15 @@ public abstract class AbstractPagedInputView implements DataInputView {
 			}
 		}
 		// The number of chars produced may be less than utflen
-		return new String(chararr, 0, chararr_count);
+		return new String(chararr, 0, chararrCount);
 	}
-	
+
 	@Override
 	public int skipBytes(int n) throws IOException {
 		if (n < 0) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		int remaining = this.limitInSegment - this.positionInSegment;
 		if (remaining >= n) {
 			this.positionInSegment += n;
@@ -512,20 +511,20 @@ public abstract class AbstractPagedInputView implements DataInputView {
 				}
 				remaining = this.limitInSegment - this.positionInSegment;
 			}
-			
+
 			int skipped = 0;
 			while (true) {
 				int toSkip = Math.min(remaining, n);
 				n -= toSkip;
 				skipped += toSkip;
-				
+
 				if (n > 0) {
 					try {
 						advance();
 					} catch (EOFException eofex) {
 						return skipped;
 					}
-					remaining = this.limitInSegment - this.positionInSegment;	
+					remaining = this.limitInSegment - this.positionInSegment;
 				}
 				else {
 					this.positionInSegment += toSkip;
@@ -541,7 +540,7 @@ public abstract class AbstractPagedInputView implements DataInputView {
 		if (numBytes < 0) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		int remaining = this.limitInSegment - this.positionInSegment;
 		if (remaining >= numBytes) {
 			this.positionInSegment += numBytes;
@@ -551,12 +550,12 @@ public abstract class AbstractPagedInputView implements DataInputView {
 				advance();
 				remaining = this.limitInSegment - this.positionInSegment;
 			}
-			
+
 			while (true) {
 				if (numBytes > remaining) {
 					numBytes -= remaining;
 					advance();
-					remaining = this.limitInSegment - this.positionInSegment;	
+					remaining = this.limitInSegment - this.positionInSegment;
 				}
 				else {
 					this.positionInSegment += numBytes;

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
index 482d82e..ddb909f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedOutputView.java
@@ -16,46 +16,45 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.memory;
 
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+
 
 /**
  * The base class for all output views that are backed by multiple memory pages. This base class contains all
  * encoding methods to write data to a page and detect page boundary crossing. The concrete sub classes must
  * implement the methods to collect the current page and provide the next memory page once the boundary is crossed.
- * <p>
- * The paging assumes that all memory segments are of the same size.
+ *
+ * <p>The paging assumes that all memory segments are of the same size.
  */
 public abstract class AbstractPagedOutputView implements DataOutputView {
-	
+
 	private MemorySegment currentSegment;			// the current memory segment to write to
-	
+
 	protected final int segmentSize;				// the size of the memory segments
-	
+
 	protected final int headerLength;				// the number of bytes to skip at the beginning of each segment
-	
+
 	private int positionInSegment;					// the offset in the current segment
-	
+
 	private byte[] utfBuffer;						// the reusable array for UTF encodings
-	
-	
+
+
 	// --------------------------------------------------------------------------------------------
 	//                                    Constructors
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a new output view that writes initially to the given initial segment. All segments in the
 	 * view have to be of the given {@code segmentSize}. A header of length {@code headerLength} is left
 	 * at the beginning of each segment.
-	 * 
+	 *
 	 * @param initialSegment The segment that the view starts writing to.
 	 * @param segmentSize The size of the memory segments.
 	 * @param headerLength The number of bytes to skip at the beginning of each segment for the header.
@@ -69,81 +68,79 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 		this.currentSegment = initialSegment;
 		this.positionInSegment = headerLength;
 	}
-	
+
 	/**
 	 * @param segmentSize The size of the memory segments.
 	 * @param headerLength The number of bytes to skip at the beginning of each segment for the header.
 	 */
-	protected AbstractPagedOutputView(int segmentSize, int headerLength)
-	{
+	protected AbstractPagedOutputView(int segmentSize, int headerLength) {
 		this.segmentSize = segmentSize;
 		this.headerLength = headerLength;
 	}
-	
+
 
 	// --------------------------------------------------------------------------------------------
 	//                                  Page Management
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * 
 	 * This method must return a segment. If no more segments are available, it must throw an
 	 * {@link java.io.EOFException}.
-	 * 
+	 *
 	 * @param current The current memory segment
 	 * @param positionInCurrent The position in the segment, one after the last valid byte.
-	 * @return The next memory segment. 
-	 * 
+	 * @return The next memory segment.
+	 *
 	 * @throws IOException
 	 */
 	protected abstract MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException;
-	
-	
+
+
 	/**
 	 * Gets the segment that the view currently writes to.
-	 * 
+	 *
 	 * @return The segment the view currently writes to.
 	 */
 	public MemorySegment getCurrentSegment() {
 		return this.currentSegment;
 	}
-	
+
 	/**
 	 * Gets the current write position (the position where the next bytes will be written)
 	 * in the current memory segment.
-	 * 
+	 *
 	 * @return The current write offset in the current memory segment.
 	 */
 	public int getCurrentPositionInSegment() {
 		return this.positionInSegment;
 	}
-	
+
 	/**
 	 * Gets the size of the segments used by this view.
-	 * 
+	 *
 	 * @return The memory segment size.
 	 */
 	public int getSegmentSize() {
 		return this.segmentSize;
 	}
-	
+
 	/**
 	 * Moves the output view to the next page. This method invokes internally the
-	 * {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass' 
+	 * {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass'
 	 * implementation and obtain the next segment to write to. Writing will continue inside the new segment
 	 * after the header.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the current segment could not be processed or a new segment could not
-	 *                     be obtained. 
+	 *                     be obtained.
 	 */
 	protected void advance() throws IOException {
 		this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
 		this.positionInSegment = this.headerLength;
 	}
-	
+
 	/**
-	 * Sets the internal state to the given memory segment and the given position within the segment. 
-	 * 
+	 * Sets the internal state to the given memory segment and the given position within the segment.
+	 *
 	 * @param seg The memory segment to write the next bytes to.
 	 * @param position The position to start writing the next bytes to.
 	 */
@@ -151,11 +148,11 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 		this.currentSegment = seg;
 		this.positionInSegment = position;
 	}
-	
+
 	/**
 	 * Clears the internal state. Any successive write calls will fail until either {@link #advance()} or
-	 * {@link #seekOutput(MemorySegment, int)} is called. 
-	 * 
+	 * {@link #seekOutput(MemorySegment, int)} is called.
+	 *
 	 * @see #advance()
 	 * @see #seekOutput(MemorySegment, int)
 	 */
@@ -163,11 +160,11 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 		this.currentSegment = null;
 		this.positionInSegment = this.headerLength;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//                               Data Output Specific methods
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void write(int b) throws IOException {
 		writeByte(b);
@@ -195,11 +192,11 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 				this.currentSegment.put(this.positionInSegment, b, off, toPut);
 				off += toPut;
 				len -= toPut;
-				
+
 				if (len > 0) {
 					this.positionInSegment = this.segmentSize;
 					advance();
-					remaining = this.segmentSize - this.positionInSegment;	
+					remaining = this.segmentSize - this.positionInSegment;
 				}
 				else {
 					this.positionInSegment += toPut;
@@ -349,7 +346,7 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 		final byte[] bytearr = this.utfBuffer;
 
 		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-		bytearr[count++] = (byte) ( utflen        & 0xFF);
+		bytearr[count++] = (byte) (utflen & 0xFF);
 
 		int i;
 		for (i = 0; i < strlen; i++) {
@@ -368,10 +365,10 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 			} else if (c > 0x07FF) {
 				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
 				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-				bytearr[count++] = (byte) (0x80 | ( c       & 0x3F));
+				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
 			} else {
 				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-				bytearr[count++] = (byte) (0x80 | ( c       & 0x3F));
+				bytearr[count++] = (byte) (0x80 | (c & 0x3F));
 			}
 		}
 
@@ -401,13 +398,13 @@ public abstract class AbstractPagedOutputView implements DataOutputView {
 				this.positionInSegment += numBytes;
 				return;
 			}
-			
+
 			if (remaining > 0) {
 				this.currentSegment.put(source, this.positionInSegment, remaining);
 				this.positionInSegment = this.segmentSize;
 				numBytes -= remaining;
 			}
-			
+
 			advance();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
index ebeb47d..454e374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/ListMemorySegmentSource.java
@@ -16,26 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.memory;
 
-import java.util.List;
-
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
 
+import java.util.List;
+
 /**
  * Simple memory segment source that draws segments from a list.
- * 
+ *
  */
-public class ListMemorySegmentSource implements MemorySegmentSource
-{
+public class ListMemorySegmentSource implements MemorySegmentSource {
 	private final List<MemorySegment> segments;
-	
+
 	public ListMemorySegmentSource(final List<MemorySegment> memorySegments) {
 		this.segments = memorySegments;
 	}
-	
 
 	@Override
 	public MemorySegment nextSegment() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
index 112d6f7..5c9708b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryAllocationException.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.memory;
 
 /**
  * An exception to be thrown when a memory allocation operation is not successful.
  */
 public class MemoryAllocationException extends Exception {
-	
+
 	private static final long serialVersionUID = -403983866457947012L;
 
 	public MemoryAllocationException() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 81d410c..e53237a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -18,6 +18,15 @@
 
 package org.apache.flink.runtime.memory;
 
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.HybridMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.util.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -29,24 +38,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.HybridMemorySegment;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * The memory manager governs the memory that Flink uses for sorting, hashing, and caching. Memory
  * is represented in segments of equal size. Operators allocate the memory by requesting a number
  * of memory segments.
- * <p>
- * The memory may be represented as on-heap byte arrays ({@link HeapMemorySegment}), or as off-heap
+ *
+ * <p>The memory may be represented as on-heap byte arrays ({@link HeapMemorySegment}), or as off-heap
  * memory regions ({@link HybridMemorySegment}). Which kind of memory the MemoryManager serves can
  * be passed as an argument to the initialization.
- * <p>
- * The memory manager can either pre-allocate all memory, or allocate the memory on demand. In the
+ *
+ * <p>The memory manager can either pre-allocate all memory, or allocate the memory on demand. In the
  * former version, memory will be occupied and reserved from start on, which means that no OutOfMemoryError
  * can come while requesting memory. Released memory will also return to the MemoryManager's pool.
  * On-demand allocation means that the memory manager only keeps track how many memory segments are
@@ -69,35 +70,35 @@ public class MemoryManager {
 
 	/** The memory pool from which we draw memory segments. Specific to on-heap or off-heap memory */
 	private final MemoryPool memoryPool;
-	
-	/** Memory segments allocated per memory owner */
+
+	/** Memory segments allocated per memory owner. */
 	private final HashMap<Object, Set<MemorySegment>> allocatedSegments;
 
-	/** The type of memory governed by this memory manager */
+	/** The type of memory governed by this memory manager. */
 	private final MemoryType memoryType;
-	
-	/** mask used to round down sizes to multiples of the page size */
+
+	/** Mask used to round down sizes to multiples of the page size. */
 	private final long roundingMask;
 
-	/** The size of the memory segments */
+	/** The size of the memory segments. */
 	private final int pageSize;
 
 	/** The initial total size, for verification. */
 	private final int totalNumPages;
 
-	/** The total size of the memory managed by this memory manager */
+	/** The total size of the memory managed by this memory manager. */
 	private final long memorySize;
 
-	/** Number of slots of the task manager */
+	/** Number of slots of the task manager. */
 	private final int numberOfSlots;
 
-	/** Flag marking whether the memory manager immediately allocates the memory */
+	/** Flag marking whether the memory manager immediately allocates the memory. */
 	private final boolean isPreAllocated;
 
-	/** The number of memory pages that have not been allocated and are available for lazy allocation */
+	/** The number of memory pages that have not been allocated and are available for lazy allocation. */
 	private int numNonAllocatedPages;
 
-	/** flag whether the close() has already been invoked */
+	/** Flag whether the close() has already been invoked. */
 	private boolean isShutDown;
 
 
@@ -160,13 +161,13 @@ public class MemoryManager {
 
 		this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
 		final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;
-		
+
 		switch (memoryType) {
 			case HEAP:
 				this.memoryPool = new HeapMemoryPool(memToAllocate, pageSize);
 				break;
 			case OFF_HEAP:
-				if(!preAllocateMemory) {
+				if (!preAllocateMemory) {
 					LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
 						" the memory type 'taskmanager.memory.off-heap' is set to true.");
 				}
@@ -189,8 +190,7 @@ public class MemoryManager {
 	 */
 	public void shutdown() {
 		// -------------------- BEGIN CRITICAL SECTION -------------------
-		synchronized (lock)
-		{
+		synchronized (lock) {
 			if (!isShutDown) {
 				// mark as shutdown and release memory
 				isShutDown = true;
@@ -202,7 +202,7 @@ public class MemoryManager {
 						seg.free();
 					}
 				}
-				
+
 				memoryPool.clear();
 			}
 		}
@@ -242,7 +242,7 @@ public class MemoryManager {
 	 *
 	 * @param owner The owner to associate with the memory segment, for the fallback release.
 	 * @param numPages The number of pages to allocate.
-	 * @return A list with the memory segments.   
+	 * @return A list with the memory segments.
 	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
 	 *                                   of memory pages any more.
 	 */
@@ -256,7 +256,7 @@ public class MemoryManager {
 	 * Allocates a set of memory segments from this memory manager. If the memory manager pre-allocated the
 	 * segments, they will be taken from the pool of memory segments. Otherwise, they will be allocated
 	 * as part of this call.
-	 * 
+	 *
 	 * @param owner The owner to associate with the memory segment, for the fallback release.
 	 * @param target The list into which to put the allocated memory pages.
 	 * @param numPages The number of pages to allocate.
@@ -264,8 +264,7 @@ public class MemoryManager {
 	 *                                   of memory pages any more.
 	 */
 	public void allocatePages(Object owner, List<MemorySegment> target, int numPages)
-			throws MemoryAllocationException
-	{
+			throws MemoryAllocationException {
 		// sanity check
 		if (owner == null) {
 			throw new IllegalArgumentException("The memory owner must not be null.");
@@ -277,8 +276,7 @@ public class MemoryManager {
 		}
 
 		// -------------------- BEGIN CRITICAL SECTION -------------------
-		synchronized (lock)
-		{
+		synchronized (lock) {
 			if (isShutDown) {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
@@ -319,8 +317,8 @@ public class MemoryManager {
 	/**
 	 * Tries to release the memory for the specified segment. If the segment has already been released or
 	 * is null, the request is simply ignored.
-	 * <p>
-	 * If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
+	 *
+	 * <p>If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
 	 * Otherwise, the segment is only freed and made eligible for reclamation by the GC.
 	 *
 	 * @param segment The segment to be released.
@@ -333,10 +331,9 @@ public class MemoryManager {
 		}
 
 		final Object owner = segment.getOwner();
-		
+
 		// -------------------- BEGIN CRITICAL SECTION -------------------
-		synchronized (lock)
-		{
+		synchronized (lock) {
 			// prevent double return to this memory manager
 			if (segment.isFreed()) {
 				return;
@@ -374,10 +371,10 @@ public class MemoryManager {
 
 	/**
 	 * Tries to release many memory segments together.
-	 * <p>
-	 * If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
+	 *
+	 * <p>If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool.
 	 * Otherwise, the segment is only freed and made eligible for reclamation by the GC.
-	 * 
+	 *
 	 * @param segments The segments to be released.
 	 * @throws NullPointerException Thrown, if the given collection is null.
 	 * @throws IllegalArgumentException Thrown, id the segments are of an incompatible type.
@@ -388,8 +385,7 @@ public class MemoryManager {
 		}
 
 		// -------------------- BEGIN CRITICAL SECTION -------------------
-		synchronized (lock)
-		{
+		synchronized (lock) {
 			if (isShutDown) {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
@@ -459,7 +455,7 @@ public class MemoryManager {
 	}
 
 	/**
-	 * Releases all memory segments for the given owner. 
+	 * Releases all memory segments for the given owner.
 	 *
 	 * @param owner The owner memory segments are to be released.
 	 */
@@ -469,8 +465,7 @@ public class MemoryManager {
 		}
 
 		// -------------------- BEGIN CRITICAL SECTION -------------------
-		synchronized (lock)
-		{
+		synchronized (lock) {
 			if (isShutDown) {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
@@ -507,7 +502,7 @@ public class MemoryManager {
 
 	/**
 	 * Gets the type of memory (heap / off-heap) managed by this memory manager.
-	 * 
+	 *
 	 * @return The type of memory managed by this memory manager.
 	 */
 	public MemoryType getMemoryType() {
@@ -556,14 +551,14 @@ public class MemoryManager {
 	 * than the page size) is not included.
 	 *
 	 * @param fraction the fraction of the total memory per slot
-	 * @return The number of pages to which 
+	 * @return The number of pages to which
 	 */
 	public int computeNumberOfPages(double fraction) {
 		if (fraction <= 0 || fraction > 1) {
 			throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1].");
 		}
 
-		return (int)(totalNumPages * fraction / numberOfSlots);
+		return (int) (totalNumPages * fraction / numberOfSlots);
 	}
 
 	/**
@@ -584,13 +579,13 @@ public class MemoryManager {
 	public long roundDownToPageSizeMultiple(long numBytes) {
 		return numBytes & roundingMask;
 	}
-	
+
 
 	// ------------------------------------------------------------------------
 	//  Memory Pools
 	// ------------------------------------------------------------------------
 
-	static abstract class MemoryPool {
+	abstract static class MemoryPool {
 
 		abstract int getNumberOfAvailableMemorySegments();
 
@@ -599,13 +594,13 @@ public class MemoryManager {
 		abstract MemorySegment requestSegmentFromPool(Object owner);
 
 		abstract void returnSegmentToPool(MemorySegment segment);
-		
+
 		abstract void clear();
 	}
 
 	static final class HeapMemoryPool extends MemoryPool {
 
-		/** The collection of available memory segments */
+		/** The collection of available memory segments. */
 		private final ArrayDeque<byte[]> availableMemory;
 
 		private final int segmentSize;
@@ -613,7 +608,7 @@ public class MemoryManager {
 		public HeapMemoryPool(int numInitialSegments, int segmentSize) {
 			this.availableMemory = new ArrayDeque<byte[]>(numInitialSegments);
 			this.segmentSize = segmentSize;
-			
+
 			for (int i = 0; i < numInitialSegments; i++) {
 				this.availableMemory.add(new byte[segmentSize]);
 			}
@@ -652,10 +647,10 @@ public class MemoryManager {
 			availableMemory.clear();
 		}
 	}
-	
+
 	static final class HybridOffHeapMemoryPool extends MemoryPool {
 
-		/** The collection of available memory segments */
+		/** The collection of available memory segments. */
 		private final ArrayDeque<ByteBuffer> availableMemory;
 
 		private final int segmentSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
index d1063cc..dac044e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerConcurrentModReleaseTest.java
@@ -20,15 +20,18 @@ package org.apache.flink.runtime.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.fail;
 
+/**
+ * Validate memory release under concurrent modification exceptions.
+ */
 public class MemoryManagerConcurrentModReleaseTest {
 
 	@Test
@@ -49,33 +52,33 @@ public class MemoryManagerConcurrentModReleaseTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testConcurrentModificationWhileReleasing() {
 		try {
 			final int numSegments = 10000;
 			final int segmentSize = 4096;
-			
+
 			MemoryManager memMan = new MemoryManager(numSegments * segmentSize, 1, segmentSize, MemoryType.HEAP, true);
-			
+
 			ArrayList<MemorySegment> segs = new ArrayList<>(numSegments);
 			memMan.allocatePages(this, segs, numSegments);
-			
+
 			// start a thread that performs concurrent modifications
 			Modifier mod = new Modifier(segs);
 			Thread modRunner = new Thread(mod);
 			modRunner.start();
-			
+
 			// give the thread some time to start working
 			Thread.sleep(500);
-			
+
 			try {
 				memMan.release(segs);
 			}
 			finally {
 				mod.cancel();
 			}
-			
+
 			modRunner.join();
 		}
 		catch (Exception e) {
@@ -83,14 +86,13 @@ public class MemoryManagerConcurrentModReleaseTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private class Modifier implements Runnable {
-		
+
 		private final ArrayList<MemorySegment> toModify;
-		
+
 		private volatile boolean running = true;
 
-		
 		private Modifier(ArrayList<MemorySegment> toModify) {
 			this.toModify = toModify;
 		}
@@ -112,13 +114,13 @@ public class MemoryManagerConcurrentModReleaseTest {
 			}
 		}
 	}
-	
+
 	private class ListWithConcModExceptionOnFirstAccess<E> extends ArrayList<E> {
 
 		private static final long serialVersionUID = -1623249699823349781L;
-		
+
 		private boolean returnedIterator;
-		
+
 		@Override
 		public Iterator<E> iterator() {
 			if (returnedIterator) {
@@ -130,8 +132,7 @@ public class MemoryManagerConcurrentModReleaseTest {
 			}
 		}
 	}
-	
-	
+
 	private class ConcFailingIterator<E> implements Iterator<E> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
index 15251e9..46d0851 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerLazyAllocationTest.java
@@ -38,20 +38,19 @@ import static org.junit.Assert.fail;
  * Tests for the memory manager, in the mode where it pre-allocates all memory.
  */
 public class MemoryManagerLazyAllocationTest {
-	
+
 	private static final long RANDOM_SEED = 643196033469871L;
 
 	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
 
 	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-	
+
 	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
 
 	private MemoryManager memoryManager;
 
 	private Random random;
 
-	
 	@Before
 	public void setUp() {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, false);
@@ -72,7 +71,7 @@ public class MemoryManagerLazyAllocationTest {
 		try {
 			final AbstractInvokable mockInvoke = new DummyInvokable();
 			List<MemorySegment> segments = new ArrayList<MemorySegment>();
-			
+
 			try {
 				for (int i = 0; i < NUM_PAGES; i++) {
 					segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
@@ -81,7 +80,7 @@ public class MemoryManagerLazyAllocationTest {
 			catch (MemoryAllocationException e) {
 				fail("Unable to allocate memory");
 			}
-			
+
 			for (MemorySegment seg : segments) {
 				this.memoryManager.release(seg);
 			}
@@ -91,21 +90,21 @@ public class MemoryManagerLazyAllocationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void allocateAllMulti() {
 		try {
 			final AbstractInvokable mockInvoke = new DummyInvokable();
 			final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-			
+
 			try {
-				for(int i = 0; i < NUM_PAGES / 2; i++) {
+				for (int i = 0; i < NUM_PAGES / 2; i++) {
 					segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
 				}
 			} catch (MemoryAllocationException e) {
 				Assert.fail("Unable to allocate memory");
 			}
-			
+
 			this.memoryManager.release(segments);
 		}
 		catch (Exception e) {
@@ -113,37 +112,37 @@ public class MemoryManagerLazyAllocationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void allocateMultipleOwners() {
-		final int NUM_OWNERS = 17;
-	
+		final int numOwners = 17;
+
 		try {
-			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
-			
+			AbstractInvokable[] owners = new AbstractInvokable[numOwners];
+
 			@SuppressWarnings("unchecked")
-			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
-			
-			for (int i = 0; i < NUM_OWNERS; i++) {
+			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
+
+			for (int i = 0; i < numOwners; i++) {
 				owners[i] = new DummyInvokable();
 				mems[i] = new ArrayList<MemorySegment>(64);
 			}
-			
+
 			// allocate all memory to the different owners
 			for (int i = 0; i < NUM_PAGES; i++) {
-				final int owner = this.random.nextInt(NUM_OWNERS);
+				final int owner = this.random.nextInt(numOwners);
 				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
 			}
-			
+
 			// free one owner at a time
-			for (int i = 0; i < NUM_OWNERS; i++) {
+			for (int i = 0; i < numOwners; i++) {
 				this.memoryManager.releaseAll(owners[i]);
 				owners[i] = null;
 				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
 				mems[i] = null;
-				
+
 				// check that the owner owners were not affected
-				for (int k = i+1; k < NUM_OWNERS; k++) {
+				for (int k = i + 1; k < numOwners; k++) {
 					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
 				}
 			}
@@ -153,24 +152,24 @@ public class MemoryManagerLazyAllocationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void allocateTooMuch() {
 		try {
 			final AbstractInvokable mockInvoke = new DummyInvokable();
-			
+
 			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-			
+
 			try {
 				this.memoryManager.allocatePages(mockInvoke, 1);
 				Assert.fail("Expected MemoryAllocationException.");
 			} catch (MemoryAllocationException maex) {
 				// expected
 			}
-			
+
 			Assert.assertTrue("The previously allocated segments were not valid any more.",
 																	allMemorySegmentsValid(segs));
-			
+
 			this.memoryManager.releaseAll(mockInvoke);
 		}
 		catch (Exception e) {
@@ -178,7 +177,7 @@ public class MemoryManagerLazyAllocationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
 		for (MemorySegment seg : memSegs) {
 			if (seg.isFreed()) {
@@ -187,7 +186,7 @@ public class MemoryManagerLazyAllocationTest {
 		}
 		return true;
 	}
-	
+
 	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
 		for (MemorySegment seg : memSegs) {
 			if (!seg.isFreed()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index a20a180..6847456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -18,40 +18,39 @@
 
 package org.apache.flink.runtime.memory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.junit.Assert;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the memory manager, in the mode where it pre-allocates all memory.
  */
 public class MemoryManagerTest {
-	
+
 	private static final long RANDOM_SEED = 643196033469871L;
 
 	private static final int MEMORY_SIZE = 1024 * 1024 * 72; // 72 MiBytes
 
 	private static final int PAGE_SIZE = 1024 * 32; // 32 KiBytes
-	
+
 	private static final int NUM_PAGES = MEMORY_SIZE / PAGE_SIZE;
 
 	private MemoryManager memoryManager;
 
 	private Random random;
 
-	
 	@Before
 	public void setUp() {
 		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
@@ -72,7 +71,7 @@ public class MemoryManagerTest {
 		try {
 			final AbstractInvokable mockInvoke = new DummyInvokable();
 			List<MemorySegment> segments = new ArrayList<MemorySegment>();
-			
+
 			try {
 				for (int i = 0; i < NUM_PAGES; i++) {
 					segments.add(this.memoryManager.allocatePages(mockInvoke, 1).get(0));
@@ -81,7 +80,7 @@ public class MemoryManagerTest {
 			catch (MemoryAllocationException e) {
 				fail("Unable to allocate memory");
 			}
-			
+
 			this.memoryManager.release(segments);
 		}
 		catch (Exception e) {
@@ -89,21 +88,21 @@ public class MemoryManagerTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void allocateAllMulti() {
 		try {
 			final AbstractInvokable mockInvoke = new DummyInvokable();
 			final List<MemorySegment> segments = new ArrayList<MemorySegment>();
-			
+
 			try {
-				for(int i = 0; i < NUM_PAGES / 2; i++) {
+				for (int i = 0; i < NUM_PAGES / 2; i++) {
 					segments.addAll(this.memoryManager.allocatePages(mockInvoke, 2));
 				}
 			} catch (MemoryAllocationException e) {
 				Assert.fail("Unable to allocate memory");
 			}
-			
+
 			this.memoryManager.release(segments);
 		}
 		catch (Exception e) {
@@ -111,37 +110,37 @@ public class MemoryManagerTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void allocateMultipleOwners() {
-		final int NUM_OWNERS = 17;
-	
+		final int numOwners = 17;
+
 		try {
-			AbstractInvokable[] owners = new AbstractInvokable[NUM_OWNERS];
-			
+			AbstractInvokable[] owners = new AbstractInvokable[numOwners];
+
 			@SuppressWarnings("unchecked")
-			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[NUM_OWNERS];
-			
-			for (int i = 0; i < NUM_OWNERS; i++) {
+			List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
+
+			for (int i = 0; i < numOwners; i++) {
 				owners[i] = new DummyInvokable();
 				mems[i] = new ArrayList<MemorySegment>(64);
 			}
-			
+
 			// allocate all memory to the different owners
 			for (int i = 0; i < NUM_PAGES; i++) {
-				final int owner = this.random.nextInt(NUM_OWNERS);
+				final int owner = this.random.nextInt(numOwners);
 				mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
 			}
-			
+
 			// free one owner at a time
-			for (int i = 0; i < NUM_OWNERS; i++) {
+			for (int i = 0; i < numOwners; i++) {
 				this.memoryManager.releaseAll(owners[i]);
 				owners[i] = null;
 				Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
 				mems[i] = null;
-				
+
 				// check that the owner owners were not affected
-				for (int k = i+1; k < NUM_OWNERS; k++) {
+				for (int k = i + 1; k < numOwners; k++) {
 					Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
 				}
 			}
@@ -151,24 +150,24 @@ public class MemoryManagerTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void allocateTooMuch() {
 		try {
 			final AbstractInvokable mockInvoke = new DummyInvokable();
-			
+
 			List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
-			
+
 			try {
 				this.memoryManager.allocatePages(mockInvoke, 1);
 				Assert.fail("Expected MemoryAllocationException.");
 			} catch (MemoryAllocationException maex) {
 				// expected
 			}
-			
+
 			Assert.assertTrue("The previously allocated segments were not valid any more.",
 																	allMemorySegmentsValid(segs));
-			
+
 			this.memoryManager.releaseAll(mockInvoke);
 		}
 		catch (Exception e) {
@@ -176,7 +175,7 @@ public class MemoryManagerTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private boolean allMemorySegmentsValid(List<MemorySegment> memSegs) {
 		for (MemorySegment seg : memSegs) {
 			if (seg.isFreed()) {
@@ -185,7 +184,7 @@ public class MemoryManagerTest {
 		}
 		return true;
 	}
-	
+
 	private boolean allMemorySegmentsFreed(List<MemorySegment> memSegs) {
 		for (MemorySegment seg : memSegs) {
 			if (!seg.isFreed()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c150a6f/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
index fad1b0e..3a07b44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -18,26 +18,29 @@
 
 package org.apache.flink.runtime.memory;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.Random;
-
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.core.memory.MemorySegment;
 
-import org.junit.Assert;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test reading and writing primitive types to {@link MemorySegment}.
+ */
 public class MemorySegmentSimpleTest {
-	
+
 	public static final long RANDOM_SEED = 643196033469871L;
 
 	public static final int MANAGED_MEMORY_SIZE = 1024 * 1024 * 16;
@@ -67,7 +70,7 @@ public class MemorySegmentSimpleTest {
 		this.manager.release(this.segment);
 		this.random = null;
 		this.segment = null;
-		
+
 		if (!this.manager.verifyEmpty()) {
 			Assert.fail("Not all memory has been properly released.");
 		}
@@ -430,7 +433,7 @@ public class MemorySegmentSimpleTest {
 				assertEquals(random.nextLong(), segment.getLong(i));
 			}
 		}
-		
+
 		// test unaligned offsets
 		{
 			final long seed = random.nextLong();
@@ -440,7 +443,7 @@ public class MemorySegmentSimpleTest {
 				long value = random.nextLong();
 				segment.putLong(offset, value);
 			}
-			
+
 			random.setSeed(seed);
 			for (int offset = 0; offset < PAGE_SIZE - 8; offset += random.nextInt(24) + 8) {
 				long shouldValue = random.nextLong();
@@ -547,22 +550,22 @@ public class MemorySegmentSimpleTest {
 			}
 		}
 	}
-	
+
 	@Test
 	public void testByteBufferWrapping() {
 		try {
 			MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
-			
+
 			ByteBuffer buf1 = seg.wrap(13, 47);
 			assertEquals(13, buf1.position());
 			assertEquals(60, buf1.limit());
 			assertEquals(47, buf1.remaining());
-			
+
 			ByteBuffer buf2 = seg.wrap(500, 267);
 			assertEquals(500, buf2.position());
 			assertEquals(767, buf2.limit());
 			assertEquals(267, buf2.remaining());
-			
+
 			ByteBuffer buf3 = seg.wrap(0, 1024);
 			assertEquals(0, buf3.position());
 			assertEquals(1024, buf3.limit());