You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:40 UTC

[7/7] flink git commit: [runtime] Remove old redundant classes - StringRecord -> StringValue - IntegerRecord -> IntValue - FileRecord -> obsolete

[runtime] Remove old redundant classes
  - StringRecord -> StringValue
  - IntegerRecord -> IntValue
  - FileRecord -> obsolete


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12c555f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12c555f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12c555f5

Branch: refs/heads/master
Commit: 12c555f5f31aa6a14d39d2a96bcf7412ec17495f
Parents: adc8530
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 6 16:27:25 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 6 16:27:25 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/core/fs/Path.java     |  28 +-
 .../flink/core/io/LocatableInputSplit.java      |   7 +-
 .../org/apache/flink/core/io/StringRecord.java  | 701 -------------------
 .../java/org/apache/flink/util/StringUtils.java |  17 +-
 .../record/io/ExternalProcessInputSplit.java    |   7 +-
 .../runtime/event/task/StringTaskEvent.java     |  26 +-
 .../task/IterationSynchronizationSinkTask.java  |  10 +-
 .../impl/types/ProfilingDataContainer.java      |  18 +-
 .../apache/flink/runtime/types/FileRecord.java  | 128 ----
 .../flink/runtime/types/IntegerRecord.java      | 100 ---
 .../runtime/util/SerializableArrayList.java     |  52 +-
 .../flink/runtime/util/SerializableHashMap.java |  30 +-
 .../flink/runtime/util/SerializableHashSet.java |  45 +-
 .../IOManagerPerformanceBenchmark.java          |  16 +-
 .../io/network/DefaultChannelSelectorTest.java  |   7 +-
 .../SlotCountExceedingParallelismTest.java      |  16 +-
 .../ScheduleOrUpdateConsumersTest.java          |  16 +-
 .../flink/runtime/types/StringRecordTest.java   | 111 ---
 .../apache/flink/runtime/types/TypeTest.java    |  88 ---
 .../apache/flink/runtime/jobmanager/Tasks.scala |  63 +-
 20 files changed, 140 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 30a2a65..5a15d6a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -28,7 +28,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.OperatingSystem;
@@ -473,20 +472,19 @@ public class Path implements IOReadableWritable, Serializable {
 
 		final boolean isNotNull = in.readBoolean();
 		if (isNotNull) {
-			final String scheme = StringRecord.readString(in);
-			final String userInfo = StringRecord.readString(in);
-			final String host = StringRecord.readString(in);
+			final String scheme = StringUtils.readNullableString(in);
+			final String userInfo = StringUtils.readNullableString(in);
+			final String host = StringUtils.readNullableString(in);
 			final int port = in.readInt();
-			final String path = StringRecord.readString(in);
-			final String query = StringRecord.readString(in);
-			final String fragment = StringRecord.readString(in);
+			final String path = StringUtils.readNullableString(in);
+			final String query = StringUtils.readNullableString(in);
+			final String fragment = StringUtils.readNullableString(in);
 
 			try {
 				uri = new URI(scheme, userInfo, host, port, path, query, fragment);
 			} catch (URISyntaxException e) {
-				throw new IOException("Error reconstructing URI: " + StringUtils.stringifyException(e));
+				throw new IOException("Error reconstructing URI", e);
 			}
-
 		}
 	}
 
@@ -498,13 +496,13 @@ public class Path implements IOReadableWritable, Serializable {
 			out.writeBoolean(false);
 		} else {
 			out.writeBoolean(true);
-			StringRecord.writeString(out, uri.getScheme());
-			StringRecord.writeString(out, uri.getUserInfo());
-			StringRecord.writeString(out, uri.getHost());
+			StringUtils.writeNullableString(uri.getScheme(), out);
+			StringUtils.writeNullableString(uri.getUserInfo(), out);
+			StringUtils.writeNullableString(uri.getHost(), out);
 			out.writeInt(uri.getPort());
-			StringRecord.writeString(out, uri.getPath());
-			StringRecord.writeString(out, uri.getQuery());
-			StringRecord.writeString(out, uri.getFragment());
+			StringUtils.writeNullableString(uri.getPath(), out);
+			StringUtils.writeNullableString(uri.getQuery(), out);
+			StringUtils.writeNullableString(uri.getFragment(), out);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
index 9bf23a6..a373639 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
 
 /**
  * A locatable input split is an input split referring to input data which is located on one or more hosts.
@@ -90,8 +91,8 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable {
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(this.splitNumber);
 		out.writeInt(this.hostnames.length);
-		for (int i = 0; i < this.hostnames.length; i++) {
-			StringRecord.writeString(out, this.hostnames[i]);
+		for (String hostname : this.hostnames) {
+			StringUtils.writeNullableString(hostname, out);
 		}
 	}
 
@@ -105,7 +106,7 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable {
 		} else {
 			this.hostnames = new String[numHosts];
 			for (int i = 0; i < numHosts; i++) {
-				this.hostnames[i] = StringRecord.readString(in);
+				this.hostnames[i] = StringUtils.readNullableString(in);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java b/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java
deleted file mode 100644
index d56b484..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/io/StringRecord.java
+++ /dev/null
@@ -1,701 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
-package org.apache.flink.core.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
-import java.nio.charset.CodingErrorAction;
-import java.nio.charset.MalformedInputException;
-import java.text.CharacterIterator;
-import java.text.StringCharacterIterator;
-import java.util.Arrays;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-
-/**
- * This class stores text using standard UTF8 encoding. It provides methods to
- * serialize, deserialize, and compare texts at byte level. The type of length
- * is integer and is serialized using zero-compressed format.
- * <p>
- * In addition, it provides methods for string traversal without converting the byte array to a string.
- * <p>
- * Also includes utilities for serializing/deserialing a string, coding/decoding a string, checking if a byte array
- * contains valid UTF8 code, calculating the length of an encoded string.
- */
-public class StringRecord implements Value {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
-		protected CharsetEncoder initialValue() {
-			return Charset.forName("UTF-8").newEncoder().onMalformedInput(CodingErrorAction.REPORT)
-				.onUnmappableCharacter(CodingErrorAction.REPORT);
-		}
-	};
-
-	private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
-		protected CharsetDecoder initialValue() {
-			return Charset.forName("UTF-8").newDecoder().onMalformedInput(CodingErrorAction.REPORT)
-				.onUnmappableCharacter(CodingErrorAction.REPORT);
-		}
-	};
-
-	/**
-	 * Cache the hash code for the encapsulated string.
-	 **/
-	private int hash = 0;
-
-	private static final byte[] EMPTY_BYTES = new byte[0];
-
-	private byte[] bytes;
-
-	private int length;
-
-	public StringRecord() {
-		this.bytes = EMPTY_BYTES;
-		this.hash = 0;
-	}
-
-	/**
-	 * Construct from a string.
-	 */
-	public StringRecord(final String string) {
-		set(string);
-	}
-
-	/** Construct from another text. */
-	public StringRecord(final StringRecord utf8) {
-		set(utf8);
-	}
-
-	/**
-	 * Construct from a byte array.
-	 */
-	public StringRecord(final byte[] utf8) {
-		set(utf8);
-	}
-
-	/**
-	 * Returns the raw bytes; however, only data up to {@link #getLength()} is
-	 * valid.
-	 */
-	public byte[] getBytes() {
-		return bytes;
-	}
-
-	/**
-	 * Returns the Unicode Scalar Value (32-bit integer value) for the character
-	 * at <code>position</code>. Note that this method avoids using the
-	 * converter or doing String instantiation
-	 * 
-	 * @return the Unicode scalar value at position or -1 if the position is
-	 *         invalid or points to a trailing byte
-	 */
-	public int charAt(final int position) {
-		if (position > this.length) {
-			return -1; // too long
-		}
-		if (position < 0) {
-			return -1;
-		}
-
-		final ByteBuffer bb = (ByteBuffer) ByteBuffer.wrap(this.bytes).position(position);
-		return bytesToCodePoint(bb.slice());
-	}
-
-	public int find(final String what) {
-		return find(what, 0);
-	}
-
-	/**
-	 * Finds any occurrence of <code>what</code> in the backing buffer, starting
-	 * as position <code>start</code>. The starting position is measured in
-	 * bytes and the return value is in terms of byte position in the buffer.
-	 * The backing buffer is not converted to a string for this operation.
-	 * 
-	 * @return byte position of the first occurence of the search string in the
-	 *         UTF-8 buffer or -1 if not found
-	 */
-	public int find(final String what, final int start) {
-		try {
-			final ByteBuffer src = ByteBuffer.wrap(this.bytes, 0, this.length);
-			final ByteBuffer tgt = encode(what);
-			final byte b = tgt.get();
-			src.position(start);
-
-			while (src.hasRemaining()) {
-				if (b == src.get()) { // matching first byte
-					src.mark(); // save position in loop
-					tgt.mark(); // save position in target
-					boolean found = true;
-					final int pos = src.position() - 1;
-					while (tgt.hasRemaining()) {
-						if (!src.hasRemaining()) { // src expired first
-							tgt.reset();
-							src.reset();
-							found = false;
-							break;
-						}
-						if (!(tgt.get() == src.get())) {
-							tgt.reset();
-							src.reset();
-							found = false;
-							break; // no match
-						}
-					}
-					if (found) {
-						return pos;
-					}
-				}
-			}
-			return -1; // not found
-		} catch (CharacterCodingException e) {
-			// can't get here
-			e.printStackTrace();
-			return -1;
-		}
-	}
-
-	/**
-	 * Set to contain the contents of a string.
-	 */
-	public void set(final String string) {
-		try {
-			final ByteBuffer bb = encode(string, true);
-			this.bytes = bb.array();
-			this.length = bb.limit();
-			this.hash = 0;
-		} catch (CharacterCodingException e) {
-			throw new RuntimeException("Should not have happened " + e.toString());
-		}
-	}
-
-	/**
-	 * Set to a utf8 byte array
-	 */
-	public void set(final byte[] utf8) {
-		set(utf8, 0, utf8.length);
-	}
-
-	/** copy a text. */
-	public void set(final StringRecord other) {
-		set(other.getBytes(), 0, other.getLength());
-	}
-
-	/** Returns the number of bytes in the byte array */
-	public int getLength() {
-		return length;
-	}
-
-	/**
-	 * Set the Text to range of bytes
-	 * 
-	 * @param utf8
-	 *        the data to copy from
-	 * @param start
-	 *        the first position of the new string
-	 * @param len
-	 *        the number of bytes of the new string
-	 */
-	public void set(final byte[] utf8, final int start, final int len) {
-		setCapacity(len, false);
-		System.arraycopy(utf8, start, bytes, 0, len);
-		this.length = len;
-		this.hash = 0;
-	}
-
-	/**
-	 * Append a range of bytes to the end of the given text
-	 * 
-	 * @param utf8
-	 *        the data to copy from
-	 * @param start
-	 *        the first position to append from utf8
-	 * @param len
-	 *        the number of bytes to append
-	 */
-	public void append(final byte[] utf8, final int start, final int len) {
-		setCapacity(length + len, true);
-		System.arraycopy(utf8, start, bytes, length, len);
-		this.length += len;
-		this.hash = 0;
-	}
-
-	/**
-	 * Clear the string to empty.
-	 */
-	public void clear() {
-		this.length = 0;
-		this.hash = 0;
-	}
-
-	/*
-	 * Sets the capacity of this Text object to <em>at least</em>
-	 * <code>len</code> bytes. If the current buffer is longer, then the
-	 * capacity and existing content of the buffer are unchanged. If
-	 * <code>len</code> is larger than the current capacity, the Text object's
-	 * capacity is increased to match.
-	 * @param len the number of bytes we need
-	 * @param keepData should the old data be kept
-	 */
-	private void setCapacity(final int len, final boolean keepData) {
-		if (this.bytes == null || this.bytes.length < len) {
-			final byte[] newBytes = new byte[len];
-			if (this.bytes != null && keepData) {
-				System.arraycopy(this.bytes, 0, newBytes, 0, this.length);
-			}
-			this.bytes = newBytes;
-		}
-	}
-
-	/**
-	 * Convert text back to string
-	 * 
-	 * @see java.lang.Object#toString()
-	 */
-	public String toString() {
-		try {
-			return decode(bytes, 0, length);
-		} catch (CharacterCodingException e) {
-			throw new RuntimeException("Should not have happened " + e.toString());
-		}
-	}
-
-	/**
-	 * deserialize
-	 * @param in
-	 */
-	public void read(final DataInputView in) throws IOException {
-		final int newLength = in.readInt();
-		setCapacity(newLength, false);
-		in.readFully(this.bytes, 0, newLength);
-		this.length = newLength;
-		this.hash = 0;
-	}
-
-	/** Skips over one Text in the input. */
-	public static void skip(final DataInput in) throws IOException {
-		final int length = in.readInt();
-		skipFully(in, length);
-	}
-
-	public static void skipFully(final DataInput in, final int len) throws IOException {
-		int total = 0;
-		int cur = 0;
-
-		while ((total < len) && ((cur = in.skipBytes(len - total)) > 0)) {
-			total += cur;
-		}
-
-		if (total < len) {
-			throw new IOException("Not able to skip " + len + " bytes, possibly " + "due to end of input.");
-		}
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		out.writeInt(this.length);
-		out.write(this.bytes, 0, this.length);
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof StringRecord)) {
-			return false;
-		}
-
-		final StringRecord sr = (StringRecord) obj;
-		if (this.length != sr.getLength()) {
-			return false;
-		}
-
-		if (this.bytes.length == this.length && sr.bytes.length == sr.length) {
-
-			return Arrays.equals(this.bytes, sr.bytes);
-
-		} else {
-
-			for (int i = 0; i < this.length; ++i) {
-				if (this.bytes[i] != sr.bytes[i]) {
-					return false;
-				}
-			}
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		int h = this.hash;
-		if (h == 0 && this.length > 0) {
-			int off = 0;
-			byte[] val = this.bytes;
-			int len = this.length;
-
-			for (int i = 0; i < len; i++) {
-				h = 31 * h + val[off++];
-			}
-			this.hash = h;
-		}
-		return h;
-
-	}
-
-	// / STATIC UTILITIES FROM HERE DOWN
-	/**
-	 * Converts the provided byte array to a String using the UTF-8 encoding. If
-	 * the input is malformed, replace by a default value.
-	 */
-	public static String decode(final byte[] utf8) throws CharacterCodingException {
-		return decode(ByteBuffer.wrap(utf8), true);
-	}
-
-	public static String decode(final byte[] utf8, final int start, final int length) throws CharacterCodingException {
-		return decode(ByteBuffer.wrap(utf8, start, length), true);
-	}
-
-	/**
-	 * Converts the provided byte array to a String using the UTF-8 encoding. If <code>replace</code> is true, then
-	 * malformed input is replaced with the
-	 * substitution character, which is U+FFFD. Otherwise the method throws a
-	 * MalformedInputException.
-	 */
-	public static String decode(final byte[] utf8, final int start, final int length, final boolean replace)
-			throws CharacterCodingException {
-		return decode(ByteBuffer.wrap(utf8, start, length), replace);
-	}
-
-	private static String decode(final ByteBuffer utf8, final boolean replace) throws CharacterCodingException {
-		final CharsetDecoder decoder = DECODER_FACTORY.get();
-		if (replace) {
-			decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE);
-			decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
-		}
-		final String str = decoder.decode(utf8).toString();
-		// set decoder back to its default value: REPORT
-		if (replace) {
-			decoder.onMalformedInput(CodingErrorAction.REPORT);
-			decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
-		}
-		return str;
-	}
-
-	/**
-	 * Converts the provided String to bytes using the UTF-8 encoding. If the
-	 * input is malformed, invalid chars are replaced by a default value.
-	 * 
-	 * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
-	 *         ByteBuffer.limit()
-	 */
-
-	public static ByteBuffer encode(final String string) throws CharacterCodingException {
-		return encode(string, true);
-	}
-
-	/**
-	 * Converts the provided String to bytes using the UTF-8 encoding. If <code>replace</code> is true, then malformed
-	 * input is replaced with the
-	 * substitution character, which is U+FFFD. Otherwise the method throws a
-	 * MalformedInputException.
-	 * 
-	 * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
-	 *         ByteBuffer.limit()
-	 */
-	public static ByteBuffer encode(final String string, final boolean replace) throws CharacterCodingException {
-
-		final CharsetEncoder encoder = ENCODER_FACTORY.get();
-		if (replace) {
-			encoder.onMalformedInput(CodingErrorAction.REPLACE);
-			encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
-		}
-		ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray()));
-		if (replace) {
-			encoder.onMalformedInput(CodingErrorAction.REPORT);
-			encoder.onUnmappableCharacter(CodingErrorAction.REPORT);
-		}
-		return bytes;
-	}
-
-	/**
-	 * Read a UTF8 encoded string from in
-	 */
-	public static String readString(final DataInput in) throws IOException {
-
-		if (in.readBoolean()) {
-			final int length = in.readInt();
-			if (length < 0) {
-				throw new IOException("length of StringRecord is " + length);
-			}
-
-			final byte[] bytes = new byte[length];
-			in.readFully(bytes, 0, length);
-			return decode(bytes);
-		}
-
-		return null;
-	}
-
-	/**
-	 * Write a UTF8 encoded string to out
-	 */
-	public static int writeString(final DataOutput out, final String s) throws IOException {
-
-		int length = 0;
-
-		if (s == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			final ByteBuffer bytes = encode(s);
-			length = bytes.limit();
-			out.writeInt(length);
-			out.write(bytes.array(), 0, length);
-		}
-		return length;
-	}
-
-	// //// states for validateUTF8
-
-	private static final int LEAD_BYTE = 0;
-
-	private static final int TRAIL_BYTE_1 = 1;
-
-	private static final int TRAIL_BYTE = 2;
-
-	/**
-	 * Check if a byte array contains valid utf-8
-	 * 
-	 * @param utf8
-	 *        byte array
-	 * @throws MalformedInputException
-	 *         if the byte array contains invalid utf-8
-	 */
-	public static void validateUTF8(final byte[] utf8) throws MalformedInputException {
-		validateUTF8(utf8, 0, utf8.length);
-	}
-
-	/**
-	 * Check to see if a byte array is valid utf-8
-	 * 
-	 * @param utf8
-	 *        the array of bytes
-	 * @param start
-	 *        the offset of the first byte in the array
-	 * @param len
-	 *        the length of the byte sequence
-	 * @throws MalformedInputException
-	 *         if the byte array contains invalid bytes
-	 */
-	public static void validateUTF8(final byte[] utf8, final int start, final int len) throws MalformedInputException {
-		int count = start;
-		int leadByte = 0;
-		int length = 0;
-		int state = LEAD_BYTE;
-		while (count < start + len) {
-			final int aByte = ((int) utf8[count] & 0xFF);
-
-			switch (state) {
-			case LEAD_BYTE:
-				leadByte = aByte;
-				length = bytesFromUTF8[aByte];
-
-				switch (length) {
-				case 0: // check for ASCII
-					if (leadByte > 0x7F) {
-						throw new MalformedInputException(count);
-					}
-					break;
-				case 1:
-					if (leadByte < 0xC2 || leadByte > 0xDF) {
-						throw new MalformedInputException(count);
-					}
-					state = TRAIL_BYTE_1;
-					break;
-				case 2:
-					if (leadByte < 0xE0 || leadByte > 0xEF) {
-						throw new MalformedInputException(count);
-					}
-					state = TRAIL_BYTE_1;
-					break;
-				case 3:
-					if (leadByte < 0xF0 || leadByte > 0xF4) {
-						throw new MalformedInputException(count);
-					}
-					state = TRAIL_BYTE_1;
-					break;
-				default:
-					// too long! Longest valid UTF-8 is 4 bytes (lead + three)
-					// or if < 0 we got a trail byte in the lead byte position
-					throw new MalformedInputException(count);
-				} // switch (length)
-				break;
-
-			case TRAIL_BYTE_1:
-				if (leadByte == 0xF0 && aByte < 0x90) {
-					throw new MalformedInputException(count);
-				}
-				if (leadByte == 0xF4 && aByte > 0x8F) {
-					throw new MalformedInputException(count);
-				}
-				if (leadByte == 0xE0 && aByte < 0xA0) {
-					throw new MalformedInputException(count);
-				}
-				if (leadByte == 0xED && aByte > 0x9F) {
-					throw new MalformedInputException(count);
-				}
-				// falls through to regular trail-byte test!!
-			case TRAIL_BYTE:
-				if (aByte < 0x80 || aByte > 0xBF) {
-					throw new MalformedInputException(count);
-				}
-				if (--length == 0) {
-					state = LEAD_BYTE;
-				} else {
-					state = TRAIL_BYTE;
-				}
-				break;
-			default:
-				break;
-			} // switch (state)
-			count++;
-		}
-	}
-
-	/**
-	 * Magic numbers for UTF-8. These are the number of bytes that <em>follow</em> a given lead byte. Trailing bytes
-	 * have the value -1. The
-	 * values 4 and 5 are presented in this table, even though valid UTF-8
-	 * cannot include the five and six byte sequences.
-	 */
-	static final int[] bytesFromUTF8 = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-		0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-		0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-		0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-		0,
-		0,
-		0,
-		// trail bytes
-		-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-		-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-		-1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
-		1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5,
-		5 };
-
-	/**
-	 * Returns the next code point at the current position in the buffer. The
-	 * buffer's position will be incremented. Any mark set on this buffer will
-	 * be changed by this method!
-	 */
-	public static int bytesToCodePoint(final ByteBuffer bytes) {
-		bytes.mark();
-		final byte b = bytes.get();
-		bytes.reset();
-		final int extraBytesToRead = bytesFromUTF8[(b & 0xFF)];
-		if (extraBytesToRead < 0) {
-			return -1; // trailing byte!
-		}
-		int ch = 0;
-
-		switch (extraBytesToRead) {
-		case 5:
-			ch += (bytes.get() & 0xFF);
-			ch <<= 6; /* remember, illegal UTF-8 */
-		case 4:
-			ch += (bytes.get() & 0xFF);
-			ch <<= 6; /* remember, illegal UTF-8 */
-		case 3:
-			ch += (bytes.get() & 0xFF);
-			ch <<= 6;
-		case 2:
-			ch += (bytes.get() & 0xFF);
-			ch <<= 6;
-		case 1:
-			ch += (bytes.get() & 0xFF);
-			ch <<= 6;
-		case 0:
-			ch += (bytes.get() & 0xFF);
-		default:
-			break;
-
-		}
-		ch -= offsetsFromUTF8[extraBytesToRead];
-
-		return ch;
-	}
-
-	static final int[] offsetsFromUTF8 = { 0x00000000, 0x00003080, 0x000E2080, 0x03C82080, 0xFA082080, 0x82082080 };
-
-	/**
-	 * For the given string, returns the number of UTF-8 bytes required to
-	 * encode the string.
-	 * 
-	 * @param string
-	 *        text to encode
-	 * @return number of UTF-8 bytes required to encode
-	 */
-	public static int utf8Length(final String string) {
-		final CharacterIterator iter = new StringCharacterIterator(string);
-		char ch = iter.first();
-		int size = 0;
-		while (ch != CharacterIterator.DONE) {
-			if ((ch >= 0xD800) && (ch < 0xDC00)) {
-				// surrogate pair?
-				char trail = iter.next();
-				if ((trail > 0xDBFF) && (trail < 0xE000)) {
-					// valid pair
-					size += 4;
-				} else {
-					// invalid pair
-					size += 3;
-					iter.previous(); // rewind one
-				}
-			} else if (ch < 0x80) {
-				size++;
-			} else if (ch < 0x800) {
-				size += 2;
-			} else {
-				// ch < 0x10000, that is, the largest char value
-				size += 3;
-			}
-			ch = iter.next();
-		}
-		return size;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 0c4194a..9c7bd8b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -134,19 +134,16 @@ public final class StringUtils {
 				sb.append("&lt;");
 			} else if (c == '&') {
 				sb.append("&amp;");
+			} else if (c < ' ') {
+				// Unreadable throw away
 			} else {
-				if (c < ' ') {
-					// Unreadable throw away
-				} else {
-					sb.append(c);
-				}
+				sb.append(c);
 			}
 		}
 
 		return sb.toString();
 	}
-	
-	
+
 	/**
 	 * This method calls {@link Object#toString()} on the given object, unless the
 	 * object is an array. In that case, it will use the {@link #arrayToString(Object)}
@@ -156,7 +153,7 @@ public final class StringUtils {
 	 * @param o The object for which to create the string representation.
 	 * @return The string representation of the object.
 	 */
-	public static final String arrayAwareToString(Object o) {
+	public static String arrayAwareToString(Object o) {
 		if (o == null) {
 			return "null";
 		}
@@ -175,7 +172,7 @@ public final class StringUtils {
 	 * @return The string representation of the array.
 	 * @throws IllegalArgumentException If the given object is no array.
 	 */
-	public static final String arrayToString(Object array) {
+	public static String arrayToString(Object array) {
 		if (array == null) {
 			throw new NullPointerException();
 		}
@@ -224,7 +221,7 @@ public final class StringUtils {
 	 * @param str The string in which to replace the control characters.
 	 * @return The string with the replaced characters.
 	 */
-	public static final String showControlCharacters(String str) {
+	public static String showControlCharacters(String str) {
 		int len = str.length();
 		StringBuilder sb = new StringBuilder();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
index 7b5ac96..9f0b345 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
@@ -16,15 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.record.io;
 
 import java.io.IOException;
 
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
 
 /**
  * The ExternalProcessInputSplit contains all informations for {@link org.apache.flink.api.common.io.InputFormat} that read their data from external processes.
@@ -68,12 +67,12 @@ public class ExternalProcessInputSplit extends GenericInputSplit {
 	@Override
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
-		this.extProcessCommand = StringRecord.readString(in);
+		this.extProcessCommand = StringUtils.readNullableString(in);
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
-		StringRecord.writeString(out, this.extProcessCommand);
+		StringUtils.writeNullableString(this.extProcessCommand, out);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
index 7b6b3d5..87f2e91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -20,26 +20,24 @@ package org.apache.flink.runtime.event.task;
 
 import java.io.IOException;
 
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
 
 /**
  * This class provides a simple implementation of an event that holds a string value.
- * 
  */
 public class StringTaskEvent extends TaskEvent {
 
 	/**
 	 * The string encapsulated by this event.
 	 */
-	private String message = null;
+	private String message;
 
 	/**
 	 * The default constructor implementation. It should only be used for deserialization.
 	 */
-	public StringTaskEvent() {
-	}
+	public StringTaskEvent() {}
 
 	/**
 	 * Constructs a new string task event with the given string message.
@@ -62,22 +60,18 @@ public class StringTaskEvent extends TaskEvent {
 
 
 	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		StringRecord.writeString(out, this.message);
+	public void write(DataOutputView out) throws IOException {
+		StringUtils.writeNullableString(this.message, out);
 	}
 
-
 	@Override
 	public void read(final DataInputView in) throws IOException {
-
-		this.message = StringRecord.readString(in);
+		this.message = StringUtils.readNullableString(in);
 	}
 
 
 	@Override
 	public int hashCode() {
-
 		if (this.message == null) {
 			return 0;
 		}
@@ -88,19 +82,13 @@ public class StringTaskEvent extends TaskEvent {
 
 	@Override
 	public boolean equals(final Object obj) {
-
 		if (!(obj instanceof StringTaskEvent)) {
 			return false;
 		}
 
 		final StringTaskEvent ste = (StringTaskEvent) obj;
-
 		if (this.message == null) {
-			if (ste.getString() == null) {
-				return true;
-			}
-
-			return false;
+			return ste.getString() == null;
 		}
 
 		return this.message.equals(ste.getString());

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index b222cb5..5eccd7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.types.IntValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.types.IntegerRecord;
 import org.apache.flink.types.Value;
 
 import com.google.common.base.Preconditions;
@@ -52,7 +52,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 
 	private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
 
-	private MutableRecordReader<IntegerRecord> headEventReader;
+	private MutableRecordReader<IntValue> headEventReader;
 	
 	private SyncEventHandler eventHandler;
 
@@ -73,7 +73,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	
 	@Override
 	public void registerInputOutput() {
-		this.headEventReader = new MutableRecordReader<IntegerRecord>(getEnvironment().getInputGate(0));
+		this.headEventReader = new MutableRecordReader<IntValue>(getEnvironment().getInputGate(0));
 	}
 
 	@Override
@@ -101,7 +101,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 				getEnvironment().getUserClassLoader());
 		headEventReader.registerTaskEventListener(eventHandler, WorkerDoneEvent.class);
 
-		IntegerRecord dummy = new IntegerRecord();
+		IntValue dummy = new IntValue();
 		
 		while (!terminationRequested()) {
 
@@ -182,7 +182,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 		return false;
 	}
 
-	private void readHeadEventChannel(IntegerRecord rec) throws IOException {
+	private void readHeadEventChannel(IntValue rec) throws IOException {
 		// reset the handler
 		eventHandler.resetEndOfSuperstep();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
index 60334db..b233b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/ProfilingDataContainer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.profiling.impl.types;
 
 import java.io.IOException;
@@ -25,7 +24,6 @@ import java.util.Iterator;
 import java.util.Queue;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.StringUtils;
@@ -53,20 +51,20 @@ public class ProfilingDataContainer implements IOReadableWritable {
 
 		final int numberOfRecords = in.readInt();
 		for (int i = 0; i < numberOfRecords; i++) {
-			final String className = StringRecord.readString(in);
-			Class<? extends InternalProfilingData> clazz = null;
+			final String className = StringUtils.readNullableString(in);
 
+			Class<? extends InternalProfilingData> clazz;
 			try {
 				clazz = (Class<? extends InternalProfilingData>) Class.forName(className);
 			} catch (Exception e) {
-				throw new IOException(StringUtils.stringifyException(e));
+				throw new IOException(e);
 			}
 
-			InternalProfilingData profilingData = null;
+			InternalProfilingData profilingData ;
 			try {
 				profilingData = clazz.newInstance();
 			} catch (Exception e) {
-				throw new IOException(StringUtils.stringifyException(e));
+				throw new IOException(e);
 			}
 
 			// Restore internal state
@@ -94,10 +92,8 @@ public class ProfilingDataContainer implements IOReadableWritable {
 		// Write the number of records
 		out.writeInt(this.queuedProfilingData.size());
 		// Write the records themselves
-		final Iterator<InternalProfilingData> iterator = this.queuedProfilingData.iterator();
-		while (iterator.hasNext()) {
-			final InternalProfilingData profilingData = iterator.next();
-			StringRecord.writeString(out, profilingData.getClass().getName());
+		for (InternalProfilingData profilingData : this.queuedProfilingData) {
+			StringUtils.writeNullableString(profilingData.getClass().getName(), out);
 			profilingData.write(out);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java b/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java
deleted file mode 100644
index 473596a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/types/FileRecord.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class FileRecord implements IOReadableWritable {
-
-	private String fileName;
-
-	private static final byte[] EMPTY_BYTES = new byte[0];
-
-	private byte[] bytes;
-
-	public FileRecord() {
-		this.bytes = EMPTY_BYTES;
-		fileName = "empty";
-	}
-
-	public FileRecord(final String fileName) {
-		this.bytes = EMPTY_BYTES;
-		this.fileName = fileName;
-	}
-
-	public void setFileName(final String fileName) {
-		this.fileName = fileName;
-	}
-
-	public String getFileName() {
-		return this.fileName;
-	}
-
-	public byte[] getDataBuffer() {
-		return this.bytes;
-	}
-
-	/**
-	 * Append a range of bytes to the end of the given data.
-	 * 
-	 * @param data
-	 *        the data to copy from
-	 * @param start
-	 *        the first position to append from data
-	 * @param len
-	 *        the number of bytes to append
-	 */
-	public void append(final byte[] data, final int start, final int len) {
-		final int oldLength = this.bytes.length;
-		setCapacity(this.bytes.length + len, true);
-		System.arraycopy(data, start, this.bytes, oldLength, len);
-	}
-
-	private void setCapacity(final int len, final boolean keepData) {
-
-		if (this.bytes == null || this.bytes.length < len) {
-			final byte[] newBytes = new byte[len];
-			if (this.bytes != null && keepData) {
-				System.arraycopy(this.bytes, 0, newBytes, 0, this.bytes.length);
-			}
-			this.bytes = newBytes;
-		}
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		this.fileName = StringRecord.readString(in);
-
-		final int newLength = in.readInt();
-		this.bytes = new byte[newLength];
-		in.readFully(this.bytes, 0, newLength);
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		StringRecord.writeString(out, fileName);
-		out.writeInt(this.bytes.length);
-		out.write(this.bytes, 0, this.bytes.length);
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof FileRecord)) {
-			return false;
-		}
-
-		final FileRecord fr = (FileRecord) obj;
-
-		if (this.bytes.length != fr.bytes.length) {
-			return false;
-		}
-
-		return Arrays.equals(this.bytes, fr.bytes);
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		return (int) ((11L * this.bytes.length) % Integer.MAX_VALUE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java b/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java
deleted file mode 100644
index cf35e29..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/types/IntegerRecord.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * This class represents record for integer values.
- */
-public class IntegerRecord implements IOReadableWritable {
-
-	/**
-	 * The integer value represented by the record.
-	 */
-	private int value = 0;
-
-	/**
-	 * Constructs a new integer record with the given integer value.
-	 * 
-	 * @param value
-	 *        the integer value this record should wrap up
-	 */
-	public IntegerRecord(final int value) {
-		this.value = value;
-	}
-
-	/**
-	 * Constructs an empty integer record (Mainly used for
-	 * serialization, do not call this constructor in your program).
-	 */
-	public IntegerRecord() {}
-
-	/**
-	 * Returns the value of this integer record.
-	 * 
-	 * @return the value of this integer record
-	 */
-	public int getValue() {
-		return this.value;
-	}
-
-	/**
-	 * Set the value of this integer record.
-	 * 
-	 * @param value
-	 *        the new value for this integer record
-	 */
-	public void setValue(final int value) {
-		this.value = value;
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		// Simply read the value from the stream
-		this.value = in.readInt();
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		// Simply write the value to the stream
-		out.writeInt(this.value);
-	}
-
-	@Override
-	public boolean equals(final Object obj) {
-		if (!(obj instanceof IntegerRecord)) {
-			return false;
-		}
-
-		final IntegerRecord ir = (IntegerRecord) obj;
-		return (this.value == ir.value);
-	}
-
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
index e28fa17..8793691 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableArrayList.java
@@ -16,34 +16,25 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.StringUtils;
 
 /**
  * This class extends a standard {@link java.util.ArrayList} by implementing the
- * {@link org.apache.flink.core.io.IOReadableWritable} interface. As a result, array lists of this type can be used
- * with Nephele's RPC system.
- * <p>
- * This class is not thread-safe.
- * 
+ * {@link org.apache.flink.core.io.IOReadableWritable} interface.
+ *
  * @param <E>
  *        the type of object stored inside this array list
  */
 public class SerializableArrayList<E extends IOReadableWritable> extends ArrayList<E> implements IOReadableWritable {
 
-	/**
-	 * Generated serial version UID.
-	 */
 	private static final long serialVersionUID = 8196856588290198537L;
 
 	/**
@@ -63,44 +54,38 @@ public class SerializableArrayList<E extends IOReadableWritable> extends ArrayLi
 		super(initialCapacity);
 	}
 
-
 	@Override
 	public void write(final DataOutputView out) throws IOException {
 
 		out.writeInt(size());
-		final Iterator<E> it = iterator();
-		while (it.hasNext()) {
-
-			final E element = it.next();
+		for (E element : this) {
 			// Write out type
-			StringRecord.writeString(out, element.getClass().getName());
+			StringUtils.writeNullableString(element.getClass().getName(), out);
 			// Write out element itself
 			element.write(out);
 		}
 	}
 
-
-	@SuppressWarnings("unchecked")
 	@Override
 	public void read(final DataInputView in) throws IOException {
-
 		// Make sure the list is empty
 		clear();
+
 		final int numberOfElements = in.readInt();
 		for (int i = 0; i < numberOfElements; i++) {
-			final String elementType = StringRecord.readString(in);
-			Class<E> clazz = null;
-			try {
-				clazz = (Class<E>) Class.forName(elementType);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			}
+			final String elementType = StringUtils.readNullableString(in);
 
-			E element = null;
+			E element;
 			try {
+				@SuppressWarnings("unchecked")
+				Class<E> clazz = (Class<E>) Class.forName(elementType);
 				element = clazz.newInstance();
-			} catch (Exception e) {
-				throw new IOException(StringUtils.stringifyException(e));
+			}
+			catch (ClassNotFoundException e) {
+				throw new IOException(e);
+			}
+			catch (Exception e) {
+				throw new IOException(e);
 			}
 
 			element.read(in);
@@ -108,13 +93,8 @@ public class SerializableArrayList<E extends IOReadableWritable> extends ArrayLi
 		}
 	}
 
-
 	@Override
 	public boolean equals(Object obj) {
-		if (!(obj instanceof SerializableArrayList<?>)) {
-			return false;
-		}
-
-		return (obj instanceof SerializableArrayList) && super.equals(obj);
+		return obj instanceof SerializableArrayList<?> && super.equals(obj);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
index b2121a7..2744fe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashMap.java
@@ -16,26 +16,20 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.StringUtils;
 
 /**
  * This class extends a standard {@link java.util.HashMap} by implementing the
- * {@link org.apache.flink.core.io.IOReadableWritable} interface. As a result, hash maps of this type can be used
- * with Nephele's RPC system.
- * <p>
- * This class is not thread-safe.
+ * {@link org.apache.flink.core.io.IOReadableWritable} interface.
  * 
  * @param <K>
  *        the type of the key used in this hash map
@@ -56,16 +50,14 @@ public class SerializableHashMap<K extends IOReadableWritable, V extends IOReada
 		
 		out.writeInt(size());
 
-		final Iterator<Map.Entry<K, V>> it = entrySet().iterator();
-
-		while (it.hasNext()) {
-
-			final Map.Entry<K, V> entry = it.next();
+		for (Map.Entry<K, V> entry : entrySet()) {
 			final K key = entry.getKey();
 			final V value = entry.getValue();
-			StringRecord.writeString(out, key.getClass().getName());
+
+			StringUtils.writeNullableString(key.getClass().getName(), out);
 			key.write(out);
-			StringRecord.writeString(out, value.getClass().getName());
+
+			StringUtils.writeNullableString(value.getClass().getName(), out);
 			value.write(out);
 		}
 	}
@@ -79,8 +71,8 @@ public class SerializableHashMap<K extends IOReadableWritable, V extends IOReada
 
 		for (int i = 0; i < numberOfMapEntries; i++) {
 
-			final String keyType = StringRecord.readString(in);
-			Class<K> keyClass = null;
+			final String keyType = StringUtils.readNullableString(in);
+			Class<K> keyClass;
 			try {
 				keyClass = (Class<K>) Class.forName(keyType);
 			} catch (ClassNotFoundException e) {
@@ -96,15 +88,15 @@ public class SerializableHashMap<K extends IOReadableWritable, V extends IOReada
 
 			key.read(in);
 
-			final String valueType = StringRecord.readString(in);
-			Class<V> valueClass = null;
+			final String valueType = StringUtils.readNullableString(in);
+			Class<V> valueClass;
 			try {
 				valueClass = (Class<V>) Class.forName(valueType);
 			} catch (ClassNotFoundException e) {
 				throw new IOException(StringUtils.stringifyException(e));
 			}
 
-			V value = null;
+			V value;
 			try {
 				value = valueClass.newInstance();
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
index 502f000..fb7b06f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableHashSet.java
@@ -16,25 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.Iterator;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.StringUtils;
 
 /**
  * This class extends a standard {@link java.util.HashSet} by implementing the
- * {@link org.apache.flink.core.io.IOReadableWritable} interface. As a result, hash sets of this type can be used
- * with Nephele's RPC system.
- * <p>
- * This class is not thread-safe.
+ * {@link org.apache.flink.core.io.IOReadableWritable} interface.
  * 
  * @param <T>
  *        the type used in this hash set
@@ -49,49 +43,36 @@ public class SerializableHashSet<T extends IOReadableWritable> extends HashSet<T
 
 	@Override
 	public void write(final DataOutputView out) throws IOException {
-
 		out.writeInt(size());
 
-		final Iterator<T> it = iterator();
-
-		while (it.hasNext()) {
-
-			final T entry = it.next();
-			StringRecord.writeString(out, entry.getClass().getName());
+		for (T entry : this) {
+			StringUtils.writeNullableString(entry.getClass().getName(), out);
 			entry.write(out);
 		}
 	}
 
-
-	@SuppressWarnings("unchecked")
-	// TODO: See if type safety can be improved here
 	@Override
 	public void read(final DataInputView in) throws IOException {
-
 		final int numberOfMapEntries = in.readInt();
 
 		for (int i = 0; i < numberOfMapEntries; i++) {
+			final String type = StringUtils.readNullableString(in);
 
-			final String type = StringRecord.readString(in);
-			Class<T> clazz = null;
-			try {
-				clazz = (Class<T>) Class.forName(type);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			}
-
-			T entry = null;
+			T entry;
 			try {
+				@SuppressWarnings("unchecked")
+				Class<T> clazz = (Class<T>) Class.forName(type);
 				entry = clazz.newInstance();
-			} catch (Exception e) {
-				throw new IOException(StringUtils.stringifyException(e));
+			}
+			catch (ClassNotFoundException e) {
+				throw new IOException(e);
+			}
+			catch (Exception e) {
+				throw new IOException(e);
 			}
 
 			entry.read(in);
-
 			add(entry);
 		}
-
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index 89cc50d..c9ca9fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.BufferedInputStream;
@@ -32,6 +31,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.List;
 
+import org.apache.flink.types.IntValue;
 import org.junit.Assert;
 
 import org.slf4j.Logger;
@@ -39,24 +39,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
-import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- *
- *
- */
+
 public class IOManagerPerformanceBenchmark {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
@@ -246,7 +236,7 @@ public class IOManagerPerformanceBenchmark {
 		
 	private void speedTestStream(int bufferSize) throws IOException {
 		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
-		final IntegerRecord rec = new IntegerRecord(0);
+		final IntValue rec = new IntValue(0);
 		
 		File tempFile = null;
 		DataOutputStream daos = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
index 13aac80..a54f5d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/DefaultChannelSelectorTest.java
@@ -16,13 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
+import org.apache.flink.types.StringValue;
 import org.junit.Test;
 
 /**
@@ -37,8 +36,8 @@ public class DefaultChannelSelectorTest {
 	@Test
 	public void channelSelect() {
 
-		final StringRecord dummyRecord = new StringRecord("abc");
-		final RoundRobinChannelSelector<StringRecord> selector = new RoundRobinChannelSelector<StringRecord>();
+		final StringValue dummyRecord = new StringValue("abc");
+		final RoundRobinChannelSelector<StringValue> selector = new RoundRobinChannelSelector<StringValue>();
 		// Test with two channels
 		final int numberOfOutputChannels = 2;
 		int[] selectedChannels = selector.selectChannels(dummyRecord, numberOfOutputChannels);

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 05812b5..f984ca9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.types.IntValue;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -130,19 +130,19 @@ public class SlotCountExceedingParallelismTest {
 
 		public final static String CONFIG_KEY = "number-of-times-to-send";
 
-		private RecordWriter<IntegerRecord> writer;
+		private RecordWriter<IntValue> writer;
 
 		private int numberOfTimesToSend;
 
 		@Override
 		public void registerInputOutput() {
-			writer = new RecordWriter<IntegerRecord>(getEnvironment().getWriter(0));
+			writer = new RecordWriter<IntValue>(getEnvironment().getWriter(0));
 			numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
 		}
 
 		@Override
 		public void invoke() throws Exception {
-			final IntegerRecord subtaskIndex = new IntegerRecord(
+			final IntValue subtaskIndex = new IntValue(
 					getEnvironment().getIndexInSubtaskGroup());
 
 			try {
@@ -164,7 +164,7 @@ public class SlotCountExceedingParallelismTest {
 
 		public final static String CONFIG_KEY = "number-of-indexes-to-receive";
 
-		private RecordReader<IntegerRecord> reader;
+		private RecordReader<IntValue> reader;
 
 		private int numberOfSubtaskIndexesToReceive;
 
@@ -173,9 +173,9 @@ public class SlotCountExceedingParallelismTest {
 
 		@Override
 		public void registerInputOutput() {
-			reader = new RecordReader<IntegerRecord>(
+			reader = new RecordReader<IntValue>(
 					getEnvironment().getInputGate(0),
-					IntegerRecord.class);
+					IntValue.class);
 
 			numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);
 			receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive);
@@ -184,7 +184,7 @@ public class SlotCountExceedingParallelismTest {
 		@Override
 		public void invoke() throws Exception {
 			try {
-				IntegerRecord record;
+				IntValue record;
 
 				int numberOfReceivedSubtaskIndexes = 0;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 70bbc25..ade14a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.types.IntegerRecord;
+import org.apache.flink.types.IntValue;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -131,7 +131,7 @@ public class ScheduleOrUpdateConsumersTest {
 
 		public final static String CONFIG_KEY = "number-of-times-to-send";
 
-		private List<RecordWriter<IntegerRecord>> writers = Lists.newArrayListWithCapacity(2);
+		private List<RecordWriter<IntValue>> writers = Lists.newArrayListWithCapacity(2);
 
 		private int numberOfTimesToSend;
 
@@ -139,11 +139,11 @@ public class ScheduleOrUpdateConsumersTest {
 		public void registerInputOutput() {
 			// The order of intermediate result creation in the job graph specifies which produced
 			// result partition is pipelined/blocking.
-			final RecordWriter<IntegerRecord> pipelinedWriter =
-					new RecordWriter<IntegerRecord>(getEnvironment().getWriter(0));
+			final RecordWriter<IntValue> pipelinedWriter =
+					new RecordWriter<IntValue>(getEnvironment().getWriter(0));
 
-			final RecordWriter<IntegerRecord> blockingWriter =
-					new RecordWriter<IntegerRecord>(getEnvironment().getWriter(1));
+			final RecordWriter<IntValue> blockingWriter =
+					new RecordWriter<IntValue>(getEnvironment().getWriter(1));
 
 			writers.add(pipelinedWriter);
 			writers.add(blockingWriter);
@@ -153,11 +153,11 @@ public class ScheduleOrUpdateConsumersTest {
 
 		@Override
 		public void invoke() throws Exception {
-			final IntegerRecord subtaskIndex = new IntegerRecord(
+			final IntValue subtaskIndex = new IntValue(
 					getEnvironment().getIndexInSubtaskGroup());
 
 			// Produce the first intermediate result and then the second in a serial fashion.
-			for (RecordWriter<IntegerRecord> writer : writers) {
+			for (RecordWriter<IntValue> writer : writers) {
 				try {
 					for (int i = 0; i < numberOfTimesToSend; i++) {
 						writer.emit(subtaskIndex);

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java
deleted file mode 100644
index 6bd2f30..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/types/StringRecordTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-
-
-/**
- *         TODO: {@link StringRecord} has a lot of public methods that need to be tested.
- */
-public class StringRecordTest {
-
-	@Mock
-	private DataInput inputMock;
-
-	@Before
-	public void setUp() {
-		initMocks(this);
-	}
-
-	/**
-	 * Tests the serialization/deserialization of the {@link StringRecord} class.
-	 */
-	@Test
-	public void testStringRecord() {
-
-		final StringRecord orig = new StringRecord("Test Record");
-
-		try {
-
-			final StringRecord copy = (StringRecord) CommonTestUtils.createCopyWritable(orig);
-			
-			assertEquals(orig.getLength(), copy.getLength());
-			assertEquals(orig.toString(), copy.toString());
-			assertEquals(orig, copy);
-			assertEquals(orig.hashCode(), copy.hashCode());
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-	}
-
-	@Test
-	public void shouldReadProperInputs() {
-		try {
-
-			when(this.inputMock.readBoolean()).thenReturn(true);
-			when(this.inputMock.readInt()).thenReturn(10);
-
-			final String readString = StringRecord.readString(inputMock);
-			assertThat(readString, is(not(nullValue())));
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-
-	}
-
-	@Test
-	public void shouldReadNegativeInputs() {
-		
-		try {
-		
-			when(this.inputMock.readBoolean()).thenReturn(true);
-			when(this.inputMock.readInt()).thenReturn(-1);
-		} catch(IOException ioe) {
-			fail(ioe.getMessage());
-		}
-
-		try {
-			StringRecord.readString(inputMock);
-		} catch(IOException ioe) {
-			return;
-		}
-
-		fail("StringRecord.readString did not throw an IOException for negative length of string");		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java
deleted file mode 100644
index 4f02c99..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/types/TypeTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.types;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.types.FileRecord;
-import org.apache.flink.runtime.types.IntegerRecord;
-import org.junit.Test;
-
-/**
- * This class contains test which check the correct serialization/deserialization of Nephele's built-in data types.
- * 
- */
-public class TypeTest {
-
-	/**
-	 * Tests the serialization/deserialization of the {@link FileRecord} class.
-	 */
-	@Test
-	public void testFileRecord() {
-
-		final FileRecord orig = new FileRecord("Test Filename");
-		final byte[] data = new byte[128];
-
-		orig.append(data, 0, data.length);
-		orig.append(data, 0, data.length);
-
-		assertEquals(orig.getDataBuffer().length, 2 * data.length);
-
-		try {
-			final FileRecord copy = (FileRecord) CommonTestUtils.createCopyWritable(orig);
-
-			assertEquals(orig.getFileName(), copy.getFileName());
-			assertEquals(orig, copy);
-			assertEquals(orig.hashCode(), copy.hashCode());
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-
-	}
-
-	/**
-	 * Tests the serialization/deserialization of the {@link IntegerRecord} class.
-	 */
-	@Test
-	public void testIntegerRecord() {
-
-		final IntegerRecord orig = new IntegerRecord(12);
-
-		try {
-
-			final IntegerRecord copy = (IntegerRecord) CommonTestUtils.createCopyWritable(orig);
-
-			assertEquals(orig.getValue(), copy.getValue());
-			assertEquals(orig, copy);
-			assertEquals(orig.hashCode(), copy.hashCode());
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		}
-
-	}
-
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12c555f5/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 9bb1a3b..f6f7cc6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager
 import org.apache.flink.runtime.io.network.api.reader.RecordReader
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
-import org.apache.flink.runtime.types.IntegerRecord
+import org.apache.flink.types.IntValue
+
 
 object Tasks {
   class BlockingNoOpInvokable extends AbstractInvokable {
@@ -52,15 +53,15 @@ object Tasks {
   }
 
   class Sender extends AbstractInvokable{
-    var writer: RecordWriter[IntegerRecord] = _
+    var writer: RecordWriter[IntValue] = _
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+      writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
     }
 
     override def invoke(): Unit = {
       try{
-        writer.emit(new IntegerRecord(42))
-        writer.emit(new IntegerRecord(1337))
+        writer.emit(new IntValue(42))
+        writer.emit(new IntValue(1337))
         writer.flush()
       }finally{
         writer.clearBuffers()
@@ -69,12 +70,12 @@ object Tasks {
   }
 
   class Forwarder extends AbstractInvokable {
-    var reader: RecordReader[IntegerRecord] = _
-    var writer: RecordWriter[IntegerRecord] = _
+    var reader: RecordReader[IntValue] = _
+    var writer: RecordWriter[IntValue] = _
     override def registerInputOutput(): Unit = {
-      reader = new RecordReader[IntegerRecord](getEnvironment.getInputGate(0),
-        classOf[IntegerRecord])
-      writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+      reader = new RecordReader[IntValue](getEnvironment.getInputGate(0),
+        classOf[IntValue])
+      writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
     }
 
     override def invoke(): Unit = {
@@ -97,12 +98,12 @@ object Tasks {
   }
 
   class Receiver extends AbstractInvokable {
-    var reader: RecordReader[IntegerRecord] = _
+    var reader: RecordReader[IntValue] = _
 
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
+      reader = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
     }
 
     override def invoke(): Unit = {
@@ -154,12 +155,12 @@ object Tasks {
   }
 
   class AgnosticReceiver extends AbstractInvokable {
-    var reader: RecordReader[IntegerRecord] = _
+    var reader: RecordReader[IntValue] = _
 
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
+      reader = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
     }
 
     override def invoke(): Unit = {
@@ -168,14 +169,14 @@ object Tasks {
   }
 
   class AgnosticBinaryReceiver extends AbstractInvokable {
-    var reader1: RecordReader[IntegerRecord] = _
-    var reader2: RecordReader[IntegerRecord] = _
+    var reader1: RecordReader[IntValue] = _
+    var reader2: RecordReader[IntValue] = _
 
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader1 = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
-      reader2 = new RecordReader[IntegerRecord](env.getInputGate(1), classOf[IntegerRecord])
+      reader1 = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
+      reader2 = new RecordReader[IntValue](env.getInputGate(1), classOf[IntValue])
     }
 
     override def invoke(): Unit = {
@@ -185,16 +186,16 @@ object Tasks {
   }
 
   class AgnosticTertiaryReceiver extends AbstractInvokable {
-    var reader1: RecordReader[IntegerRecord] = _
-    var reader2: RecordReader[IntegerRecord] = _
-    var reader3: RecordReader[IntegerRecord] = _
+    var reader1: RecordReader[IntValue] = _
+    var reader2: RecordReader[IntValue] = _
+    var reader3: RecordReader[IntValue] = _
 
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader1 = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
-      reader2 = new RecordReader[IntegerRecord](env.getInputGate(1), classOf[IntegerRecord])
-      reader3 = new RecordReader[IntegerRecord](env.getInputGate(2), classOf[IntegerRecord])
+      reader1 = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
+      reader2 = new RecordReader[IntValue](env.getInputGate(1), classOf[IntValue])
+      reader3 = new RecordReader[IntValue](env.getInputGate(2), classOf[IntValue])
     }
 
     override def invoke(): Unit = {
@@ -205,10 +206,10 @@ object Tasks {
   }
 
   class ExceptionSender extends AbstractInvokable{
-    var writer: RecordWriter[IntegerRecord] = _
+    var writer: RecordWriter[IntValue] = _
 
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+      writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
     }
 
     override def invoke(): Unit = {
@@ -217,10 +218,10 @@ object Tasks {
   }
 
   class SometimesExceptionSender extends AbstractInvokable {
-    var writer: RecordWriter[IntegerRecord] = _
+    var writer: RecordWriter[IntValue] = _
 
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+      writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
     }
 
     override def invoke(): Unit = {
@@ -240,7 +241,7 @@ object Tasks {
 
   class ExceptionReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
-      new RecordReader[IntegerRecord](getEnvironment.getInputGate(0), classOf[IntegerRecord])
+      new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
     }
 
     override def invoke(): Unit = {
@@ -266,7 +267,7 @@ object Tasks {
     }
 
     override def registerInputOutput(): Unit = {
-      new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+      new RecordWriter[IntValue](getEnvironment.getWriter(0))
     }
 
     override def invoke(): Unit = {
@@ -281,7 +282,7 @@ object Tasks {
 
   class BlockingReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
-      new RecordReader[IntegerRecord](getEnvironment.getInputGate(0), classOf[IntegerRecord])
+      new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
     }
 
     override def invoke(): Unit = {