You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/01/06 11:45:30 UTC

[3/4] incubator-flink git commit: [FLINK-1349] [runtime] Various fixes concerning Akka - Remove obsolete code from old IPC net utils - Smaller Writable/IOReadableWritable serialzation buffer start size (most messages are rather small) - For message lo

[FLINK-1349] [runtime] Various fixes concerning Akka
 - Remove obsolete code from old IPC net utils
 - Smaller Writable/IOReadableWritable serialzation buffer start size (most messages are rather small)
 - For message logging, make system calls (timestamps) only in debug mode
 - Clean up warnings / code simplifications


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/972a7b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/972a7b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/972a7b01

Branch: refs/heads/master
Commit: 972a7b01980b4b06d4893df6aa6aebd4e26e5990
Parents: 24c4736
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Dec 28 10:55:21 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 6 11:07:02 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/yarn/Utils.java  |   2 +-
 .../flink/configuration/ConfigConstants.java    |  11 +-
 .../java/org/apache/flink/core/fs/Path.java     |  23 +-
 flink-dist/pom.xml                              |   2 +-
 flink-runtime/pom.xml                           |   1 -
 .../runtime/accumulators/AccumulatorEvent.java  |   2 +
 .../org/apache/flink/runtime/blob/BlobKey.java  |  88 ++--
 .../deployment/ChannelDeploymentDescriptor.java |   2 +
 .../deployment/GateDeploymentDescriptor.java    |   2 +
 .../deployment/TaskDeploymentDescriptor.java    |  11 +
 .../runtime/fs/hdfs/DistributedFileStatus.java  |  10 +-
 .../network/ConnectionInfoLookupResponse.java   |   4 +-
 .../runtime/io/network/RemoteReceiver.java      |   2 +
 .../apache/flink/runtime/jobgraph/JobGraph.java |  18 +-
 .../runtime/jobmanager/scheduler/SubSlot.java   |   3 +
 .../runtime/jobmanager/web/WebInfoServer.java   |  11 +-
 .../org/apache/flink/runtime/net/NetUtils.java  | 317 ++-----------
 .../flink/runtime/net/SocketIOWithTimeout.java  | 457 -------------------
 .../flink/runtime/net/SocketInputStream.java    | 179 --------
 .../flink/runtime/net/SocketOutputStream.java   | 222 ---------
 .../apache/flink/runtime/ActorLogMessages.scala |  16 +-
 .../IOReadableWritableSerializer.scala          |   2 +-
 .../akka/serialization/WritableSerializer.scala |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 341 +++++++-------
 .../ExecutionGraphDeploymentTest.java           |   2 +-
 .../ExecutionVertexCancelTest.java              |   8 +-
 .../runtime/operators/DataSinkTaskTest.java     |  17 +-
 .../runtime/operators/DataSourceTaskTest.java   |   3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  17 +
 flink-tests/pom.xml                             |   1 -
 .../CustomCompensatableDanglingPageRank.java    |   4 +-
 ...mpensatableDanglingPageRankWithCombiner.java |   4 +-
 .../javaApiOperators/GroupReduceITCase.java     |  64 +--
 33 files changed, 372 insertions(+), 1476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 6b541bf..bd5659a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -121,7 +121,7 @@ public class Utils {
 		// chain-in a new classloader
 		URL fileUrl = null;
 		try {
-			fileUrl = path.toURL();
+			fileUrl = path.toURI().toURL();
 		} catch (MalformedURLException e) {
 			throw new RuntimeException("Erroneous config file path", e);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2d97504..df96339 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -295,7 +295,7 @@ public final class ConfigConstants {
 	public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";
 
 	/**
-	 * Hearbeat interval of the transport failure detector
+	 * Heartbeat interval of the transport failure detector
 	 */
 	public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "akka.transport.heartbeat.interval";
 
@@ -466,11 +466,6 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
 
 	/**
-	 * The default interval for TaskManager heart beats (5000 msecs).
-	 */
-	public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 5000;
-
-	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
 	 */
 	public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -559,8 +554,8 @@ public final class ConfigConstants {
 	/**
 	 * The default directory to store temporary objects (e.g. during file uploads).
 	 */
-	public static final String DEFAULT_WEB_TMP_DIR = System.getProperty("java.io.tmpdir") == null ? "/tmp" : System
-		.getProperty("java.io.tmpdir");
+	public static final String DEFAULT_WEB_TMP_DIR = 
+			System.getProperty("java.io.tmpdir") == null ? "/tmp" : System.getProperty("java.io.tmpdir");
 
 	/**
 	 * The default directory for temporary plan dumps from the web frontend.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 950a38b..a104d86 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -16,16 +16,12 @@
  * limitations under the License.
  */
 
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
+/* This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
+ * additional information regarding copyright ownership. */
 
 package org.apache.flink.core.fs;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
@@ -501,19 +497,4 @@ public class Path implements IOReadableWritable, Serializable {
 		}
 
 	}
-	
-	public static String constructTestPath(Class<?> forClass, String folder) {
-		// we create test path that depends on class to prevent name clashes when two tests
-		// create temp files with the same name
-		String path = System.getProperty("java.io.tmpdir");
-		if (!(path.endsWith("/") || path.endsWith("\\")) ) {
-			path += System.getProperty("file.separator");
-		}
-		path += (forClass.getName() + "-" + folder);
-		return path;
-	}
-	
-	public static String constructTestURI(Class<?> forClass, String folder) {
-		return new File(constructTestPath(forClass, folder)).toURI().toString();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index f73a000..e753a05 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -210,7 +210,7 @@ under the License.
 					</plugin>
 					<plugin>
 						<artifactId>maven-assembly-plugin</artifactId>
-						<version>2.4</version>
+						<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 						<executions>
 							<!-- yarn bin directory -->
 							<execution>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b87977e..5a9ca8a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -130,7 +130,6 @@ under the License.
 		<dependency>
 			<groupId>com.github.scopt</groupId>
 			<artifactId>scopt_2.10</artifactId>
-			<version>3.2.0</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
index 761248b..3fed259 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
@@ -39,6 +39,8 @@ import org.apache.flink.util.InstantiationUtil;
  */
 public class AccumulatorEvent implements Serializable {
 
+	private static final long serialVersionUID = 8965894516006882735L;
+
 	private JobID jobID;
 	
 	private Map<String, Accumulator<?, ?>> accumulators;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index c12fd23..e3d237d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -32,19 +32,16 @@ import java.util.Arrays;
  */
 public final class BlobKey implements Serializable, Comparable<BlobKey> {
 
-	/**
-	 * Array of hex characters to facilitate fast toString() method.
-	 */
+	private static final long serialVersionUID = 3847117712521785209L;
+
+	/** Array of hex characters to facilitate fast toString() method. */
 	private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();
 
-	/**
-	 * Size of the internal BLOB key in bytes.
-	 */
+	/** Size of the internal BLOB key in bytes. */
 	private static final int SIZE = 20;
 
-	/**
-	 * The byte buffer storing the actual key data.
-	 */
+	
+	/** The byte buffer storing the actual key data. */
 	private final byte[] key;
 
 	/**
@@ -60,8 +57,7 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 	 * @param key
 	 *        the actual key data
 	 */
-	BlobKey(final byte[] key) {
-
+	BlobKey(byte[] key) {
 		if (key.length != SIZE) {
 			throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
 		}
@@ -70,8 +66,15 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 	}
 
 	/**
-	 * {@inheritDoc}
+	 * Adds the BLOB key to the given {@link MessageDigest}.
+	 * 
+	 * @param md
+	 *        the message digest to add the BLOB key to
 	 */
+	public void addToMessageDigest(MessageDigest md) {
+		md.update(this.key);
+	}
+	
 	@Override
 	public boolean equals(final Object obj) {
 
@@ -84,17 +87,11 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 		return Arrays.equals(this.key, bk.key);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	@Override
 	public int hashCode() {
 		return Arrays.hashCode(this.key);
 	}
 
-	/**
-	 * {@inheritDoc}
-	 */
 	@Override
 	public String toString() {
 		// from http://stackoverflow.com/questions/9655181/convert-from-byte-array-to-hex-string-in-java
@@ -108,6 +105,26 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 		return new String(hexChars);
 	}
 
+	@Override
+	public int compareTo(BlobKey o) {
+	
+		final byte[] aarr = this.key;
+		final byte[] barr = o.key;
+		final int len = Math.min(aarr.length, barr.length);
+	
+		for (int i = 0; i < len; ++i) {
+			final int a = (aarr[i] & 0xff);
+			final int b = (barr[i] & 0xff);
+			if (a != b) {
+				return a - b;
+			}
+		}
+	
+		return aarr.length - barr.length;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Auxiliary method to read a BLOB key from an input stream.
 	 * 
@@ -117,7 +134,7 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 	 * @throws IOException
 	 *         throw if an I/O error occurs while reading from the input stream
 	 */
-	static BlobKey readFromInputStream(final InputStream inputStream) throws IOException {
+	static BlobKey readFromInputStream(InputStream inputStream) throws IOException {
 
 		final byte[] key = new byte[BlobKey.SIZE];
 
@@ -142,39 +159,6 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 	 *         thrown if an I/O error occurs while writing the BLOB key
 	 */
 	void writeToOutputStream(final OutputStream outputStream) throws IOException {
-
 		outputStream.write(this.key);
 	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
-	public int compareTo(final BlobKey o) {
-
-		final byte[] aarr = this.key;
-		final byte[] barr = o.key;
-		final int len = Math.min(aarr.length, barr.length);
-
-		for (int i = 0; i < len; ++i) {
-			final int a = (aarr[i] & 0xff);
-			final int b = (barr[i] & 0xff);
-			if (a != b) {
-				return a - b;
-			}
-		}
-
-		return aarr.length - barr.length;
-	}
-
-	/**
-	 * Adds the BLOB key to the given {@link MessageDigest}.
-	 * 
-	 * @param md
-	 *        the message digest to add the BLOB key to
-	 */
-	public void addToMessageDigest(final MessageDigest md) {
-
-		md.update(this.key);
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
index 2c90091..6fff4a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ChannelDeploymentDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.io.network.channels.ChannelID;
  */
 public final class ChannelDeploymentDescriptor implements Serializable {
 
+	private static final long serialVersionUID = -4079084629425460213L;
+
 	/** The ID of the output channel. */
 	private final ChannelID outputChannelID;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
index 1f73546..c676023 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/GateDeploymentDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
  */
 public final class GateDeploymentDescriptor implements Serializable {
 
+	private static final long serialVersionUID = -8433936680266802364L;
+
 	/** The list of channel deployment descriptors attached to this gate. */
 	private final List<ChannelDeploymentDescriptor> channels;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 20157d9..75a3c9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  */
 public final class TaskDeploymentDescriptor implements Serializable {
 
+	private static final long serialVersionUID = -3233562176034358530L;
+
 	/** The ID of the job the tasks belongs to. */
 	private final JobID jobID;
 
@@ -192,6 +194,15 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	public int getCurrentNumberOfSubtasks() {
 		return this.currentNumberOfSubtasks;
 	}
+	
+	/**
+	 * Gets the number of the slot into which the task is to be deployed.
+	 * 
+	 * @return The number of the target slot.
+	 */
+	public int getTargetSlotNumber() {
+		return targetSlotNumber;
+	}
 
 	/**
 	 * Returns the configuration of the job the task belongs to.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
index 6b819dc..d8c9873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileStatus.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.core.fs.FileStatus;
@@ -25,7 +24,6 @@ import org.apache.flink.core.fs.Path;
 /**
  * Concrete implementation of the {@link FileStatus} interface for the
  * Hadoop Distribution File System.
- * 
  */
 public final class DistributedFileStatus implements FileStatus {
 
@@ -43,13 +41,11 @@ public final class DistributedFileStatus implements FileStatus {
 
 	@Override
 	public long getLen() {
-
 		return fileStatus.getLen();
 	}
 
 	@Override
 	public long getBlockSize() {
-
 		long blocksize = fileStatus.getBlockSize();
 		if (blocksize > fileStatus.getLen()) {
 			return fileStatus.getLen();
@@ -60,19 +56,16 @@ public final class DistributedFileStatus implements FileStatus {
 
 	@Override
 	public long getAccessTime() {
-
 		return fileStatus.getAccessTime();
 	}
 
 	@Override
 	public long getModificationTime() {
-
 		return fileStatus.getModificationTime();
 	}
 
 	@Override
 	public short getReplication() {
-
 		return fileStatus.getReplication();
 	}
 
@@ -82,13 +75,12 @@ public final class DistributedFileStatus implements FileStatus {
 
 	@Override
 	public Path getPath() {
-
 		return new Path(fileStatus.getPath().toString());
 	}
 
+	@SuppressWarnings("deprecation")
 	@Override
 	public boolean isDir() {
-
 		return fileStatus.isDir();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
index 547ce70..51a0f94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
 import java.io.Serializable;
@@ -25,6 +24,9 @@ import org.apache.flink.runtime.io.network.channels.ChannelID;
 
 public class ConnectionInfoLookupResponse implements Serializable {
 
+	private static final long serialVersionUID = 3961171754642077522L;
+
+	
 	private enum ReturnCode {
 		NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
 	};

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 78dcbae..436d07d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -32,6 +32,8 @@ import org.apache.flink.core.memory.DataOutputView;
  */
 public final class RemoteReceiver implements IOReadableWritable, Serializable {
 
+	private static final long serialVersionUID = 4304924747853162443L;
+
 	/**
 	 * The address of the connection to the remote TaskManager.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 3497824..a288357 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -43,10 +43,12 @@ import org.apache.flink.runtime.blob.BlobKey;
  */
 public class JobGraph implements Serializable {
 
+	private static final long serialVersionUID = 1L;
+	
 	// --------------------------------------------------------------------------------------------
 	// Members that define the structure / topology of the graph
 	// --------------------------------------------------------------------------------------------
-	
+
 	/** List of task vertices included in this job graph. */
 	private final Map<JobVertexID, AbstractJobVertex> taskVertices = new LinkedHashMap<JobVertexID, AbstractJobVertex>();
 
@@ -54,7 +56,7 @@ public class JobGraph implements Serializable {
 	private final Configuration jobConfiguration = new Configuration();
 
 	/** Set of JAR files required to run this job. */
-	private final transient List<Path> userJars = new ArrayList<Path>();
+	private final List<Path> userJars = new ArrayList<Path>();
 
 	/** Set of blob keys identifying the JAR files required to run this job. */
 	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
@@ -357,7 +359,6 @@ public class JobGraph implements Serializable {
 	 * @return set of BLOB keys referring to the JAR files required to run this job
 	 */
 	public List<BlobKey> getUserJarBlobKeys() {
-
 		return this.userJarBlobKeys;
 	}
 
@@ -369,15 +370,13 @@ public class JobGraph implements Serializable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs during the upload
 	 */
-	public void uploadRequiredJarFiles(final InetSocketAddress serverAddress) throws IOException {
-
+	public void uploadRequiredJarFiles(InetSocketAddress serverAddress) throws IOException {
 		if (this.userJars.isEmpty()) {
 			return;
 		}
 
 		BlobClient bc = null;
 		try {
-
 			bc = new BlobClient(serverAddress);
 
 			for (final Path jar : this.userJars) {
@@ -388,14 +387,15 @@ public class JobGraph implements Serializable {
 					is = fs.open(jar);
 					final BlobKey key = bc.put(is);
 					this.userJarBlobKeys.add(key);
-				} finally {
+				}
+				finally {
 					if (is != null) {
 						is.close();
 					}
 				}
 			}
-
-		} finally {
+		}
+		finally {
 			if (bc != null) {
 				bc.close();
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
index 0b19a5f..92d457b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SubSlot.java
@@ -23,6 +23,9 @@ import org.apache.flink.runtime.instance.AllocatedSlot;
 
 public class SubSlot extends AllocatedSlot {
 
+	private static final long serialVersionUID = 1361615219044538497L;
+	
+
 	private final SharedSlot sharedSlot;
 	
 	private final AbstractID groupId;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 7e67603..24dbaf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -85,15 +85,16 @@ public class WebInfoServer {
 	 */
 	public WebInfoServer(Configuration config, ActorRef jobmanager,
 						ActorRef archive, FiniteDuration timeout) throws IOException {
-		this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
-
-		this.timeout = timeout;
-
+		
 		// if no explicit configuration is given, use the global configuration
 		if (config == null) {
 			config = GlobalConfiguration.getConfiguration();
 		}
+		
+		this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		
+		this.timeout = timeout;
 
 		// get base path of Flink installation
 		final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 2795158..ec2633c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -16,18 +16,9 @@
  * limitations under the License.
  */
 
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
 package org.apache.flink.runtime.net;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -35,286 +26,15 @@ import java.net.NetworkInterface;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import javax.net.SocketFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NetUtils {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
 
-	private static Map<String, String> hostToResolved = new HashMap<String, String>();
-
-	public static SocketFactory getSocketFactory() {
-
-		return getDefaultSocketFactory();
-	}
-
-	public static SocketFactory getDefaultSocketFactory() {
-
-		return SocketFactory.getDefault();
-	}
-
-	/**
-	 * Util method to build socket addr from either:
-	 * <host>:<post>
-	 * <fs>://<host>:<port>/<path>
-	 */
-	public static InetSocketAddress createSocketAddr(String target) {
-		return createSocketAddr(target, -1);
-	}
-
-	/**
-	 * Util method to build socket addr from either:
-	 * <host>
-	 * <host>:<post>
-	 * <fs>://<host>:<port>/<path>
-	 */
-	public static InetSocketAddress createSocketAddr(String target, int defaultPort) {
-		int colonIndex = target.indexOf(':');
-		if (colonIndex < 0 && defaultPort == -1) {
-			throw new RuntimeException("Not a host:port pair: " + target);
-		}
-		String hostname = "";
-		int port = -1;
-		if (!target.contains("/")) {
-			if (colonIndex == -1) {
-				hostname = target;
-			} else {
-				// must be the old style <host>:<port>
-				hostname = target.substring(0, colonIndex);
-				port = Integer.parseInt(target.substring(colonIndex + 1));
-			}
-		} else {
-			// a new uri
-			try {
-				URI addr = new URI(target);
-				hostname = addr.getHost();
-				port = addr.getPort();
-			} catch (URISyntaxException use) {
-				LOG.error("Invalid URI syntax.", use);
-			}
-		}
-
-		if (port == -1) {
-			port = defaultPort;
-		}
-
-		if (getStaticResolution(hostname) != null) {
-			hostname = getStaticResolution(hostname);
-		}
-		return new InetSocketAddress(hostname, port);
-	}
-
-	/**
-	 * Adds a static resolution for host. This can be used for setting up
-	 * hostnames with names that are fake to point to a well known host. For e.g.
-	 * in some testcases we require to have daemons with different hostnames
-	 * running on the same machine. In order to create connections to these
-	 * daemons, one can set up mappings from those hostnames to "localhost".
-	 * {@link NetUtils#getStaticResolution(String)} can be used to query for
-	 * the actual hostname.
-	 * 
-	 * @param host
-	 * @param resolvedName
-	 */
-	public static void addStaticResolution(String host, String resolvedName) {
-		synchronized (hostToResolved) {
-			hostToResolved.put(host, resolvedName);
-		}
-	}
-
-	/**
-	 * Retrieves the resolved name for the passed host. The resolved name must
-	 * have been set earlier using {@link NetUtils#addStaticResolution(String, String)}
-	 * 
-	 * @param host
-	 * @return the resolution
-	 */
-	public static String getStaticResolution(String host) {
-		synchronized (hostToResolved) {
-			return hostToResolved.get(host);
-		}
-	}
-
-	/**
-	 * This is used to get all the resolutions that were added using
-	 * {@link NetUtils#addStaticResolution(String, String)}. The return
-	 * value is a List each element of which contains an array of String
-	 * of the form String[0]=hostname, String[1]=resolved-hostname
-	 * 
-	 * @return the list of resolutions
-	 */
-	public static List<String[]> getAllStaticResolutions() {
-		synchronized (hostToResolved) {
-			Set<Entry<String, String>> entries = hostToResolved.entrySet();
-			if (entries.size() == 0) {
-				return null;
-			}
-			List<String[]> l = new ArrayList<String[]>(entries.size());
-			for (Entry<String, String> e : entries) {
-				l.add(new String[] { e.getKey(), e.getValue() });
-			}
-			return l;
-		}
-	}
-
-	/**
-	 * Same as getInputStream(socket, socket.getSoTimeout()).<br>
-	 * <br>
-	 * From documentation for {@link #getInputStream(Socket, long)}:<br>
-	 * Returns InputStream for the socket. If the socket has an associated
-	 * SocketChannel then it returns a {@link SocketInputStream} with the given timeout. If the socket does not
-	 * have a channel, {@link Socket#getInputStream()} is returned. In the later
-	 * case, the timeout argument is ignored and the timeout set with {@link Socket#setSoTimeout(int)} applies for
-	 * reads.<br>
-	 * <br>
-	 * Any socket created using socket factories returned by {@link #NetUtils},
-	 * must use this interface instead of {@link Socket#getInputStream()}.
-	 * 
-	 * @see #getInputStream(Socket, long)
-	 * @param socket
-	 * @return InputStream for reading from the socket.
-	 * @throws IOException
-	 */
-	public static InputStream getInputStream(Socket socket) throws IOException {
-		return getInputStream(socket, socket.getSoTimeout());
-	}
-
-	/**
-	 * Returns InputStream for the socket. If the socket has an associated
-	 * SocketChannel then it returns a {@link SocketInputStream} with the given timeout. If the socket does not
-	 * have a channel, {@link Socket#getInputStream()} is returned. In the later
-	 * case, the timeout argument is ignored and the timeout set with {@link Socket#setSoTimeout(int)} applies for
-	 * reads.<br>
-	 * <br>
-	 * Any socket created using socket factories returned by {@link #NetUtils},
-	 * must use this interface instead of {@link Socket#getInputStream()}.
-	 * 
-	 * @see Socket#getChannel()
-	 * @param socket
-	 * @param timeout
-	 *        timeout in milliseconds. This may not always apply. zero
-	 *        for waiting as long as necessary.
-	 * @return InputStream for reading from the socket.
-	 * @throws IOException
-	 */
-	public static InputStream getInputStream(Socket socket, long timeout) throws IOException {
-		return (socket.getChannel() == null) ? socket.getInputStream() : new SocketInputStream(socket, timeout);
-	}
-
-	/**
-	 * Same as getOutputStream(socket, 0). Timeout of zero implies write will
-	 * wait until data is available.<br>
-	 * <br>
-	 * From documentation for {@link #getOutputStream(Socket, long)} : <br>
-	 * Returns OutputStream for the socket. If the socket has an associated
-	 * SocketChannel then it returns a {@link SocketOutputStream} with the given timeout. If the socket does not
-	 * have a channel, {@link Socket#getOutputStream()} is returned. In the later
-	 * case, the timeout argument is ignored and the write will wait until
-	 * data is available.<br>
-	 * <br>
-	 * Any socket created using socket factories returned by {@link #NetUtils},
-	 * must use this interface instead of {@link Socket#getOutputStream()}.
-	 * 
-	 * @see #getOutputStream(Socket, long)
-	 * @param socket
-	 * @return OutputStream for writing to the socket.
-	 * @throws IOException
-	 */
-	public static OutputStream getOutputStream(Socket socket) throws IOException {
-		return getOutputStream(socket, 0);
-	}
-
-	/**
-	 * Returns OutputStream for the socket. If the socket has an associated
-	 * SocketChannel then it returns a {@link SocketOutputStream} with the given timeout. If the socket does not
-	 * have a channel, {@link Socket#getOutputStream()} is returned. In the later
-	 * case, the timeout argument is ignored and the write will wait until
-	 * data is available.<br>
-	 * <br>
-	 * Any socket created using socket factories returned by {@link #NetUtils},
-	 * must use this interface instead of {@link Socket#getOutputStream()}.
-	 * 
-	 * @see Socket#getChannel()
-	 * @param socket
-	 * @param timeout
-	 *        timeout in milliseconds. This may not always apply. zero
-	 *        for waiting as long as necessary.
-	 * @return OutputStream for writing to the socket.
-	 * @throws IOException
-	 */
-	public static OutputStream getOutputStream(Socket socket, long timeout) throws IOException {
-		return (socket.getChannel() == null) ? socket.getOutputStream() : new SocketOutputStream(socket, timeout);
-	}
-
-	public static void connect(Socket socket, SocketAddress endpoint, int timeout) throws IOException {
-		if (socket == null || endpoint == null || timeout < 0) {
-			throw new IllegalArgumentException("Illegal argument for connect()");
-		}
-
-		SocketChannel ch = socket.getChannel();
-
-		if (ch == null) {
-			// let the default implementation handle it.
-			socket.connect(endpoint, timeout);
-		} else {
-			SocketIOWithTimeout.connect(ch, endpoint, timeout);
-		}
-	}
-
-	/**
-	 * Given a string representation of a host, return its ip address
-	 * in textual presentation.
-	 * 
-	 * @param name
-	 *        a string representation of a host:
-	 *        either a textual representation its IP address or its host name
-	 * @return its IP address in the string format
-	 */
-	public static String normalizeHostName(String name) {
-		if (Character.digit(name.charAt(0), 16) != -1) { // it is an IP
-			return name;
-		} else {
-			try {
-				InetAddress ipAddress = InetAddress.getByName(name);
-				return ipAddress.getHostAddress();
-			} catch (UnknownHostException e) {
-				return name;
-			}
-		}
-	}
-
-	/**
-	 * Given a collection of string representation of hosts, return a list of
-	 * corresponding IP addresses in the textual representation.
-	 * 
-	 * @param names
-	 *        a collection of string representations of hosts
-	 * @return a list of corresponding IP addresses in the string format
-	 * @see #normalizeHostName(String)
-	 */
-	public static List<String> normalizeHostNames(Collection<String> names) {
-		List<String> hostNames = new ArrayList<String>(names.size());
-		for (String name : names) {
-			hostNames.add(normalizeHostName(name));
-		}
-		return hostNames;
-	}
-
 	/**
 	 * The states of address detection mechanism.
 	 * There is only a state transition if the current state failed to determine the address.
@@ -344,11 +64,14 @@ public class NetUtils {
 
 		while (true) {
 			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+			
 			while (e.hasMoreElements()) {
 				NetworkInterface n = e.nextElement();
 				Enumeration<InetAddress> ee = n.getInetAddresses();
+				
 				while (ee.hasMoreElements()) {
 					InetAddress i = ee.nextElement();
+					
 					switch (strategy) {
 						case ADDRESS:
 							if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
@@ -358,6 +81,7 @@ public class NetUtils {
 								}
 							}
 							break;
+							
 						case FAST_CONNECT:
 						case SLOW_CONNECT:
 							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
@@ -366,17 +90,22 @@ public class NetUtils {
 								return i;
 							}
 							break;
+							
 						case HEURISTIC:
-							LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
-									"isLinkLocalAddress:" + i.isLinkLocalAddress() +" " +
-									"isLoopbackAddress:" + i.isLoopbackAddress() + ".");
-							if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
+							if (LOG.isDebugEnabled()) {
+								LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
+										" isLinkLocalAddress:" + i.isLinkLocalAddress() +
+										" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
+							}
+							
+							if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
 								LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
 										"loopback address. Using instead " + i.getHostAddress() + " on network " +
 										"interface " + n.getName() + ".");
 								return i;
 							}
 							break;
+							
 						default:
 							throw new RuntimeException("Unkown address detection strategy: " + strategy);
 					}
@@ -391,11 +120,11 @@ public class NetUtils {
 					strategy = AddressDetectionState.SLOW_CONNECT;
 					break;
 				case SLOW_CONNECT:
-					if(!InetAddress.getLocalHost().isLoopbackAddress()){
+					if (!InetAddress.getLocalHost().isLoopbackAddress()) {
 						LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " +
 								"IP address.");
 						return InetAddress.getLocalHost();
-					}else {
+					} else {
 						strategy = AddressDetectionState.HEURISTIC;
 						break;
 					}
@@ -437,7 +166,8 @@ public class NetUtils {
 				LOG.debug("Failed with exception", ex);
 			}
 			connectable = false;
-		} finally {
+		}
+		finally {
 			if (socket != null) {
 				socket.close();
 			}
@@ -454,13 +184,18 @@ public class NetUtils {
 				if (port != 0) {
 					return port;
 				}
-			} catch (IOException e) {
-				LOG.debug("Unable to allocate port " + e.getMessage(), e);
-			} finally {
+			}
+			catch (IOException e) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Unable to allocate port " + e.getMessage(), e);
+				}
+			}
+			finally {
 				if (serverSocket != null) {
 					try {
 						serverSocket.close();
 					} catch (Throwable t) {
+						// ignored
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
deleted file mode 100644
index daecfa5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This supports input and output streams for a socket channels.
- * These streams can have a timeout.
- */
-abstract class SocketIOWithTimeout {
-	// This is intentionally package private.
-
-	static final Logger LOG = LoggerFactory.getLogger(SocketIOWithTimeout.class);
-
-	private SelectableChannel channel;
-
-	private long timeout;
-
-	private boolean closed = false;
-
-	private static SelectorPool selector = new SelectorPool();
-
-	/*
-	 * A timeout value of 0 implies wait for ever.
-	 * We should have a value of timeout that implies zero wait.. i.e.
-	 * read or write returns immediately.
-	 * This will set channel to non-blocking.
-	 */
-	SocketIOWithTimeout(SelectableChannel channel, long timeout)
-																throws IOException {
-		checkChannelValidity(channel);
-
-		this.channel = channel;
-		this.timeout = timeout;
-		// Set non-blocking
-		channel.configureBlocking(false);
-	}
-
-	void close() {
-		closed = true;
-	}
-
-	boolean isOpen() {
-		return !closed && channel.isOpen();
-	}
-
-	SelectableChannel getChannel() {
-		return channel;
-	}
-
-	/**
-	 * Utility function to check if channel is ok.
-	 * Mainly to throw IOException instead of runtime exception
-	 * in case of mismatch. This mismatch can occur for many runtime
-	 * reasons.
-	 */
-	static void checkChannelValidity(Object channel) throws IOException {
-		if (channel == null) {
-			/*
-			 * Most common reason is that original socket does not have a channel.
-			 * So making this an IOException rather than a RuntimeException.
-			 */
-			throw new IOException("Channel is null. Check " + "how the channel or socket is created.");
-		}
-
-		if (!(channel instanceof SelectableChannel)) {
-			throw new IOException("Channel should be a SelectableChannel");
-		}
-	}
-
-	/**
-	 * Performs actual IO operations. This is not expected to block.
-	 * 
-	 * @param buf
-	 * @return number of bytes (or some equivalent). 0 implies underlying
-	 *         channel is drained completely. We will wait if more IO is
-	 *         required.
-	 * @throws IOException
-	 */
-	abstract int performIO(ByteBuffer buf) throws IOException;
-
-	/**
-	 * Performs one IO and returns number of bytes read or written.
-	 * It waits up to the specified timeout. If the channel is
-	 * not read before the timeout, SocketTimeoutException is thrown.
-	 * 
-	 * @param buf
-	 *        buffer for IO
-	 * @param ops
-	 *        Selection Ops used for waiting. Suggested values:
-	 *        SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
-	 *        writing.
-	 * @return number of bytes read or written. negative implies end of stream.
-	 * @throws IOException
-	 */
-	int doIO(ByteBuffer buf, int ops) throws IOException {
-
-		/*
-		 * For now only one thread is allowed. If user want to read or write
-		 * from multiple threads, multiple streams could be created. In that
-		 * case multiple threads work as well as underlying channel supports it.
-		 */
-		if (!buf.hasRemaining()) {
-			throw new IllegalArgumentException("Buffer has no data left.");
-			// or should we just return 0?
-		}
-
-		while (buf.hasRemaining()) {
-			if (closed) {
-				return -1;
-			}
-
-			try {
-				int n = performIO(buf);
-				if (n != 0) {
-					// successful io or an error.
-					return n;
-				}
-			} catch (IOException e) {
-				if (!channel.isOpen()) {
-					closed = true;
-				}
-				throw e;
-			}
-
-			// now wait for socket to be ready.
-			int count = 0;
-			try {
-				count = selector.select(channel, ops, timeout);
-			} catch (IOException e) { // unexpected IOException.
-				closed = true;
-				throw e;
-			}
-
-			if (count == 0) {
-				throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, ops));
-			}
-			// otherwise the socket should be ready for io.
-		}
-
-		return 0; // does not reach here.
-	}
-
-	/**
-	 * The contract is similar to {@link SocketChannel#connect(SocketAddress)} with a timeout.
-	 * 
-	 * @see SocketChannel#connect(SocketAddress)
-	 * @param channel
-	 *        - this should be a {@link SelectableChannel}
-	 * @param endpoint
-	 * @throws IOException
-	 */
-	static void connect(SocketChannel channel, SocketAddress endpoint, int timeout) throws IOException {
-
-		boolean blockingOn = channel.isBlocking();
-		if (blockingOn) {
-			channel.configureBlocking(false);
-		}
-
-		try {
-			if (channel.connect(endpoint)) {
-				return;
-			}
-
-			long timeoutLeft = timeout;
-			long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout) : 0;
-
-			while (true) {
-				// we might have to call finishConnect() more than once
-				// for some channels (with user level protocols)
-
-				int ret = selector.select((SelectableChannel) channel, SelectionKey.OP_CONNECT, timeoutLeft);
-
-				if (ret > 0 && channel.finishConnect()) {
-					return;
-				}
-
-				if (ret == 0 || (timeout > 0 && (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
-					throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, SelectionKey.OP_CONNECT));
-				}
-			}
-		} catch (IOException e) {
-			// javadoc for SocketChannel.connect() says channel should be closed.
-			try {
-				channel.close();
-			} catch (IOException ignored) {
-			}
-			throw e;
-		} finally {
-			if (blockingOn && channel.isOpen()) {
-				channel.configureBlocking(true);
-			}
-		}
-	}
-
-	/**
-	 * This is similar to {@link #doIO(ByteBuffer, int)} except that it
-	 * does not perform any I/O. It just waits for the channel to be ready
-	 * for I/O as specified in ops.
-	 * 
-	 * @param ops
-	 *        Selection Ops used for waiting
-	 * @throws SocketTimeoutException
-	 *         if select on the channel times out.
-	 * @throws IOException
-	 *         if any other I/O error occurs.
-	 */
-	void waitForIO(int ops) throws IOException {
-
-		if (selector.select(channel, ops, timeout) == 0) {
-			throw new SocketTimeoutException(timeoutExceptionString(channel, timeout, ops));
-		}
-	}
-
-	private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops) {
-
-		String waitingFor;
-		switch (ops) {
-
-		case SelectionKey.OP_READ:
-			waitingFor = "read";
-			break;
-
-		case SelectionKey.OP_WRITE:
-			waitingFor = "write";
-			break;
-
-		case SelectionKey.OP_CONNECT:
-			waitingFor = "connect";
-			break;
-
-		default:
-			waitingFor = "" + ops;
-		}
-
-		return timeout + " millis timeout while " + "waiting for channel to be ready for " + waitingFor + ". ch : "
-			+ channel;
-	}
-
-	/**
-	 * This maintains a pool of selectors. These selectors are closed
-	 * once they are idle (unused) for a few seconds.
-	 */
-	private static class SelectorPool {
-
-		private static class SelectorInfo {
-			Selector selector;
-
-			long lastActivityTime;
-
-			LinkedList<SelectorInfo> queue;
-
-			void close() {
-				if (selector != null) {
-					try {
-						selector.close();
-					} catch (IOException e) {
-						LOG.warn("Unexpected exception while closing selector : " + e.toString());
-					}
-				}
-			}
-		}
-
-		private static class ProviderInfo {
-			SelectorProvider provider;
-
-			LinkedList<SelectorInfo> queue; // lifo
-
-			ProviderInfo next;
-		}
-
-		private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
-
-		private ProviderInfo providerList = null;
-
-		/**
-		 * Waits on the channel with the given timeout using one of the
-		 * cached selectors. It also removes any cached selectors that are
-		 * idle for a few seconds.
-		 * 
-		 * @param channel
-		 * @param ops
-		 * @param timeout
-		 * @return
-		 * @throws IOException
-		 */
-		int select(SelectableChannel channel, int ops, long timeout) throws IOException {
-
-			SelectorInfo info = get(channel);
-
-			SelectionKey key = null;
-			int ret = 0;
-
-			try {
-				while (true) {
-					long start = (timeout == 0) ? 0 : System.currentTimeMillis();
-
-					key = channel.register(info.selector, ops);
-					ret = info.selector.select(timeout);
-
-					if (ret != 0) {
-						return ret;
-					}
-
-					/*
-					 * Sometimes select() returns 0 much before timeout for
-					 * unknown reasons. So select again if required.
-					 */
-					if (timeout > 0) {
-						timeout -= System.currentTimeMillis() - start;
-						if (timeout <= 0) {
-							return 0;
-						}
-					}
-
-					if (Thread.currentThread().isInterrupted()) {
-						throw new InterruptedIOException("Interruped while waiting for " + "IO on channel " + channel
-							+ ". " + timeout + " millis timeout left.");
-					}
-				}
-			} finally {
-				if (key != null) {
-					key.cancel();
-				}
-
-				// clear the canceled key.
-				try {
-					info.selector.selectNow();
-				} catch (IOException e) {
-					LOG.info("Unexpected Exception while clearing selector : " + e.toString());
-					// don't put the selector back.
-					info.close();
-					return ret;
-				}
-
-				release(info);
-			}
-		}
-
-		/**
-		 * Takes one selector from end of LRU list of free selectors.
-		 * If there are no selectors awailable, it creates a new selector.
-		 * Also invokes trimIdleSelectors().
-		 * 
-		 * @param channel
-		 * @return
-		 * @throws IOException
-		 */
-		private synchronized SelectorInfo get(SelectableChannel channel) throws IOException {
-			SelectorInfo selInfo = null;
-
-			SelectorProvider provider = channel.provider();
-
-			// pick the list : rarely there is more than one provider in use.
-			ProviderInfo pList = providerList;
-			while (pList != null && pList.provider != provider) {
-				pList = pList.next;
-			}
-			if (pList == null) {
-				// LOG.info("Creating new ProviderInfo : " + provider.toString());
-				pList = new ProviderInfo();
-				pList.provider = provider;
-				pList.queue = new LinkedList<SelectorInfo>();
-				pList.next = providerList;
-				providerList = pList;
-			}
-
-			LinkedList<SelectorInfo> queue = pList.queue;
-
-			if (queue.isEmpty()) {
-				Selector selector = provider.openSelector();
-				selInfo = new SelectorInfo();
-				selInfo.selector = selector;
-				selInfo.queue = queue;
-			} else {
-				selInfo = queue.removeLast();
-			}
-
-			trimIdleSelectors(System.currentTimeMillis());
-			return selInfo;
-		}
-
-		/**
-		 * puts selector back at the end of LRU list of free selectos.
-		 * Also invokes trimIdleSelectors().
-		 * 
-		 * @param info
-		 */
-		private synchronized void release(SelectorInfo info) {
-			long now = System.currentTimeMillis();
-			trimIdleSelectors(now);
-			info.lastActivityTime = now;
-			info.queue.addLast(info);
-		}
-
-		/**
-		 * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
-		 * traverse the whole list, just over the one that have crossed
-		 * the timeout.
-		 */
-		private void trimIdleSelectors(long now) {
-			long cutoff = now - IDLE_TIMEOUT;
-
-			for (ProviderInfo pList = providerList; pList != null; pList = pList.next) {
-				if (pList.queue.isEmpty()) {
-					continue;
-				}
-				for (Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
-					SelectorInfo info = it.next();
-					if (info.lastActivityTime > cutoff) {
-						break;
-					}
-					it.remove();
-					info.close();
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java
deleted file mode 100644
index eb22ad0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketInputStream.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-
-/**
- * This implements an input stream that can have a timeout while reading.
- * This sets non-blocking flag on the socket channel.
- * So after create this object, read() on {@link Socket#getInputStream()} and write() on
- * {@link Socket#getOutputStream()} for the associated socket will throw
- * IllegalBlockingModeException.
- * Please use {@link SocketOutputStream} for writing.
- */
-public class SocketInputStream extends InputStream implements ReadableByteChannel {
-
-	private Reader reader;
-
-	private static class Reader extends SocketIOWithTimeout {
-		ReadableByteChannel channel;
-
-		Reader(ReadableByteChannel channel, long timeout)
-															throws IOException {
-			super((SelectableChannel) channel, timeout);
-			this.channel = channel;
-		}
-
-		int performIO(ByteBuffer buf) throws IOException {
-			return channel.read(buf);
-		}
-	}
-
-	/**
-	 * Create a new input stream with the given timeout. If the timeout
-	 * is zero, it will be treated as infinite timeout. The socket's
-	 * channel will be configured to be non-blocking.
-	 * 
-	 * @param channel
-	 *        Channel for reading, should also be a {@link SelectableChannel}.
-	 *        The channel will be configured to be non-blocking.
-	 * @param timeout
-	 *        timeout in milliseconds. must not be negative.
-	 * @throws IOException
-	 */
-	public SocketInputStream(ReadableByteChannel channel, long timeout)
-																		throws IOException {
-		SocketIOWithTimeout.checkChannelValidity(channel);
-		reader = new Reader(channel, timeout);
-	}
-
-	/**
-	 * Same as SocketInputStream(socket.getChannel(), timeout): <br>
-	 * <br>
-	 * Create a new input stream with the given timeout. If the timeout
-	 * is zero, it will be treated as infinite timeout. The socket's
-	 * channel will be configured to be non-blocking.
-	 * 
-	 * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
-	 * @param socket
-	 *        should have a channel associated with it.
-	 * @param timeout
-	 *        timeout timeout in milliseconds. must not be negative.
-	 * @throws IOException
-	 */
-	public SocketInputStream(Socket socket, long timeout)
-															throws IOException {
-		this(socket.getChannel(), timeout);
-	}
-
-	/**
-	 * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
-	 * :<br>
-	 * <br>
-	 * Create a new input stream with the given timeout. If the timeout
-	 * is zero, it will be treated as infinite timeout. The socket's
-	 * channel will be configured to be non-blocking.
-	 * 
-	 * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
-	 * @param socket
-	 *        should have a channel associated with it.
-	 * @throws IOException
-	 */
-	public SocketInputStream(Socket socket)
-											throws IOException {
-		this(socket.getChannel(), socket.getSoTimeout());
-	}
-
-	@Override
-	public int read() throws IOException {
-		/*
-		 * Allocation can be removed if required.
-		 * probably no need to optimize or encourage single byte read.
-		 */
-		byte[] buf = new byte[1];
-		int ret = read(buf, 0, 1);
-		if (ret > 0) {
-			return (byte) buf[0];
-		}
-		if (ret != -1) {
-			// unexpected
-			throw new IOException("Could not read from stream");
-		}
-		return ret;
-	}
-
-	public int read(byte[] b, int off, int len) throws IOException {
-		return read(ByteBuffer.wrap(b, off, len));
-	}
-
-	public synchronized void close() throws IOException {
-		/*
-		 * close the channel since Socket.getInputStream().close()
-		 * closes the socket.
-		 */
-		reader.channel.close();
-		reader.close();
-	}
-
-	/**
-	 * Returns underlying channel used by inputstream.
-	 * This is useful in certain cases like channel for
-	 * {@link java.nio.channels.FileChannel#transferFrom(ReadableByteChannel, long, long)}.
-	 */
-	public ReadableByteChannel getChannel() {
-		return reader.channel;
-	}
-
-	// ReadableByteChannel interface
-
-	public boolean isOpen() {
-		return reader.isOpen();
-	}
-
-	public int read(ByteBuffer dst) throws IOException {
-		return reader.doIO(dst, SelectionKey.OP_READ);
-	}
-
-	/**
-	 * waits for the underlying channel to be ready for reading.
-	 * The timeout specified for this stream applies to this wait.
-	 * 
-	 * @throws SocketTimeoutException
-	 *         if select on the channel times out.
-	 * @throws IOException
-	 *         if any other I/O error occurs.
-	 */
-	public void waitForReadable() throws IOException {
-		reader.waitForIO(SelectionKey.OP_READ);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java
deleted file mode 100644
index 2df4bbe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketOutputStream.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
- * additional information regarding copyright ownership. 
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * This implements an output stream that can have a timeout while writing.
- * This sets non-blocking flag on the socket channel.
- * So after creating this object , read() on {@link Socket#getInputStream()} and write() on
- * {@link Socket#getOutputStream()} on the associated socket will throw
- * llegalBlockingModeException.
- * Please use {@link SocketInputStream} for reading.
- */
-public class SocketOutputStream extends OutputStream implements WritableByteChannel {
-
-	private Writer writer;
-
-	private static class Writer extends SocketIOWithTimeout {
-		WritableByteChannel channel;
-
-		Writer(WritableByteChannel channel, long timeout)
-															throws IOException {
-			super((SelectableChannel) channel, timeout);
-			this.channel = channel;
-		}
-
-		int performIO(ByteBuffer buf) throws IOException {
-			return channel.write(buf);
-		}
-	}
-
-	/**
-	 * Create a new ouput stream with the given timeout. If the timeout
-	 * is zero, it will be treated as infinite timeout. The socket's
-	 * channel will be configured to be non-blocking.
-	 * 
-	 * @param channel
-	 *        Channel for writing, should also be a {@link SelectableChannel}.
-	 *        The channel will be configured to be non-blocking.
-	 * @param timeout
-	 *        timeout in milliseconds. must not be negative.
-	 * @throws IOException
-	 */
-	public SocketOutputStream(WritableByteChannel channel, long timeout)
-																		throws IOException {
-		SocketIOWithTimeout.checkChannelValidity(channel);
-		writer = new Writer(channel, timeout);
-	}
-
-	/**
-	 * Same as SocketOutputStream(socket.getChannel(), timeout):<br>
-	 * <br>
-	 * Create a new ouput stream with the given timeout. If the timeout
-	 * is zero, it will be treated as infinite timeout. The socket's
-	 * channel will be configured to be non-blocking.
-	 * 
-	 * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
-	 * @param socket
-	 *        should have a channel associated with it.
-	 * @param timeout
-	 *        timeout timeout in milliseconds. must not be negative.
-	 * @throws IOException
-	 */
-	public SocketOutputStream(Socket socket, long timeout)
-															throws IOException {
-		this(socket.getChannel(), timeout);
-	}
-
-	public void write(int b) throws IOException {
-		/*
-		 * If we need to, we can optimize this allocation.
-		 * probably no need to optimize or encourage single byte writes.
-		 */
-		final byte[] buf = new byte[1];
-		buf[0] = (byte) b;
-		write(buf, 0, 1);
-	}
-
-	public void write(byte[] b, int off, int len) throws IOException {
-		final ByteBuffer buf = ByteBuffer.wrap(b, off, len);
-		while (buf.hasRemaining()) {
-			try {
-				if (write(buf) < 0) {
-					throw new IOException("The stream is closed");
-				}
-			} catch (IOException e) {
-				/*
-				 * Unlike read, write can not inform user of partial writes.
-				 * So will close this if there was a partial write.
-				 */
-				if (buf.capacity() > buf.remaining()) {
-					writer.close();
-				}
-				throw e;
-			}
-		}
-	}
-
-	public synchronized void close() throws IOException {
-		/*
-		 * close the channel since Socket.getOuputStream().close()
-		 * closes the socket.
-		 */
-		writer.channel.close();
-		writer.close();
-	}
-
-	/**
-	 * Returns underlying channel used by this stream.
-	 * This is useful in certain cases like channel for {@link FileChannel#transferTo(long, long, WritableByteChannel)}
-	 */
-	public WritableByteChannel getChannel() {
-		return writer.channel;
-	}
-
-	// WritableByteChannle interface
-
-	public boolean isOpen() {
-		return writer.isOpen();
-	}
-
-	public int write(ByteBuffer src) throws IOException {
-		return writer.doIO(src, SelectionKey.OP_WRITE);
-	}
-
-	/**
-	 * waits for the underlying channel to be ready for writing.
-	 * The timeout specified for this stream applies to this wait.
-	 * 
-	 * @throws SocketTimeoutException
-	 *         if select on the channel times out.
-	 * @throws IOException
-	 *         if any other I/O error occurs.
-	 */
-	public void waitForWritable() throws IOException {
-		writer.waitForIO(SelectionKey.OP_WRITE);
-	}
-
-	/**
-	 * Transfers data from FileChannel using {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
-	 * Similar to readFully(), this waits till requested amount of
-	 * data is transfered.
-	 * 
-	 * @param fileCh
-	 *        FileChannel to transfer data from.
-	 * @param position
-	 *        position within the channel where the transfer begins
-	 * @param count
-	 *        number of bytes to transfer.
-	 * @throws EOFException
-	 *         If end of input file is reached before requested number of
-	 *         bytes are transfered.
-	 * @throws SocketTimeoutException
-	 *         If this channel blocks transfer longer than timeout for
-	 *         this stream.
-	 * @throws IOException
-	 *         Includes any exception thrown by {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
-	 */
-	public void transferToFully(FileChannel fileCh, long position, int count) throws IOException {
-
-		while (count > 0) {
-			/*
-			 * Ideally we should wait after transferTo returns 0. But because of
-			 * a bug in JRE on Linux (http://bugs.sun.com/view_bug.do?bug_id=5103988),
-			 * which throws an exception instead of returning 0, we wait for the
-			 * channel to be writable before writing to it. If you ever see
-			 * IOException with message "Resource temporarily unavailable"
-			 * thrown here, please let us know.
-			 * Once we move to JAVA SE 7, wait should be moved to correct place.
-			 */
-			waitForWritable();
-			final int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
-
-			if (nTransfered == 0) {
-				// check if end of file is reached.
-				if (position >= fileCh.size()) {
-					throw new EOFException("EOF Reached. file size is " + fileCh.size() + " and " + count
-						+ " more bytes left to be " + "transfered.");
-				}
-				// otherwise assume the socket is full.
-				// waitForWritable(); // see comment above.
-			} else if (nTransfered < 0) {
-				throw new IOException("Unexpected return of " + nTransfered + " from transferTo()");
-			} else {
-				position += nTransfered;
-				count -= nTransfered;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index 892e68d..c8ec180 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -30,13 +30,17 @@ trait ActorLogMessages {
     override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
 
     override def apply(x: Any):Unit = {
-      if(log.isDebugEnabled) {
-        log.debug(s"Received message ${x} from ${self.sender}.")
+      if (!log.isDebugEnabled) {
+        _receiveWithLogMessages(x)
       }
-      val start = System.nanoTime()
-      _receiveWithLogMessages(x)
-      val duration = (System.nanoTime() - start) / 1000000
-      if(log.isDebugEnabled) {
+      else {
+        log.debug(s"Received message ${x} from ${self.sender}.")
+        
+        val start = System.nanoTime()
+        
+        _receiveWithLogMessages(x)
+        
+        val duration = (System.nanoTime() - start) / 1000000
         log.debug(s"Handled message ${x} in ${duration} ms from ${self.sender}.")
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
index dff72bb..3a906b1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.network.serialization.{DataInputDeserializer,
 import org.apache.flink.util.InstantiationUtil
 
 class IOReadableWritableSerializer extends JSerializer {
-  val INITIAL_BUFFER_SIZE = 8096
+  val INITIAL_BUFFER_SIZE = 256
 
   override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
     val in = new DataInputDeserializer(bytes, 0, bytes.length)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/972a7b01/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
index 0ac7ed6..be1ef17 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
@@ -25,7 +25,7 @@ import org.apache.flink.util.InstantiationUtil
 import org.apache.hadoop.io.Writable
 
 class WritableSerializer extends JSerializer {
-  val INITIAL_BUFFER_SIZE = 8096
+  val INITIAL_BUFFER_SIZE = 256
 
   override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
     val in = new DataInputDeserializer(bytes, 0, bytes.length)