You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:26 UTC

[1/9] flink git commit: [FLINK-5824] Fix String/byte conversions without explicit encoding

Repository: flink
Updated Branches:
  refs/heads/master 65ccf7cdf -> 2592a19cc


http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index a54b8dd..da83a06 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -27,6 +27,8 @@ import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_LONG;
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_NULL;
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_STRING;
+
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.python.api.types.CustomTypeWrapper;
 
 /**
@@ -177,7 +179,7 @@ public class PythonPlanReceiver implements Serializable {
 			int size = input.readInt();
 			byte[] buffer = new byte[size];
 			input.readFully(buffer);
-			return new String(buffer);
+			return new String(buffer, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index ecbc7f4..06af9d8 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -12,11 +12,14 @@
  */
 package org.apache.flink.python.api.streaming.plan;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.ServerSocket;
 import java.net.Socket;
-import org.apache.flink.python.api.streaming.util.StreamPrinter;
+
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
@@ -82,8 +85,8 @@ public class PythonPlanStreamer implements Serializable {
 		} catch (IllegalThreadStateException ise) {//Process still running
 		}
 
-		process.getOutputStream().write("plan\n".getBytes());
-		process.getOutputStream().write((server.getLocalPort() + "\n").getBytes());
+		process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
+		process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		process.getOutputStream().flush();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index fce69fa..f228327 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -12,11 +12,13 @@
  */
 package org.apache.flink.python.api.streaming.util;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.python.api.types.CustomTypeWrapper;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
 public class SerializationUtils {
 	public static final byte TYPE_BOOLEAN = (byte) 34;
 	public static final byte TYPE_BYTE = (byte) 33;
@@ -168,7 +170,7 @@ public class SerializationUtils {
 	public static class StringSerializer extends Serializer<String> {
 		@Override
 		public byte[] serializeWithoutTypeInfo(String value) {
-			byte[] string = value.getBytes();
+			byte[] string = value.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			byte[] data = new byte[4 + string.length];
 			ByteBuffer.wrap(data).putInt(string.length).put(string);
 			return data;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
index 5ff3572..30a728c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -12,6 +12,8 @@
  */
 package org.apache.flink.python.api.streaming.util;
 
+import org.apache.flink.configuration.ConfigConstants;
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +32,7 @@ public class StreamPrinter extends Thread {
 	}
 
 	public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
-		this.reader = new BufferedReader(new InputStreamReader(stream));
+		this.reader = new BufferedReader(new InputStreamReader(stream, ConfigConstants.DEFAULT_CHARSET));
 		this.wrapInException = wrapInException;
 		this.msg = msg;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 5246b94..42abd4c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
@@ -117,7 +118,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 			if (value.length == 0) {
 				frameworkID = Option.empty();
 			} else {
-				frameworkID = Option.apply(Protos.FrameworkID.newBuilder().setValue(new String(value)).build());
+				frameworkID = Option.apply(Protos.FrameworkID.newBuilder().setValue(new String(value,
+					ConfigConstants.DEFAULT_CHARSET)).build());
 			}
 
 			return frameworkID;
@@ -134,7 +136,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		synchronized (startStopLock) {
 			verifyIsRunning();
 
-			byte[] value = frameworkID.isDefined() ? frameworkID.get().getValue().getBytes() : new byte[0];
+			byte[] value = frameworkID.isDefined() ? frameworkID.get().getValue().getBytes(ConfigConstants.DEFAULT_CHARSET) :
+				new byte[0];
 			frameworkIdInZooKeeper.setValue(value);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 42fe6a5..113107f 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -36,6 +36,7 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
 import java.util.ConcurrentModificationException;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -187,7 +188,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 	private void send(final String name, final String value) {
 		try {
 			String formatted = String.format("%s:%s|g", name, value);
-			byte[] data = formatted.getBytes();
+			byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
 			socket.send(new DatagramPacket(data, data.length, this.address));
 		}
 		catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index e53ef44..17fd65a 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -382,7 +382,7 @@ public class StatsDReporterTest extends TestLogger {
 
 					socket.receive(packet);
 
-					String line = new String(packet.getData(), 0, packet.getLength());
+					String line = new String(packet.getData(), 0, packet.getLength(), ConfigConstants.DEFAULT_CHARSET);
 
 					lines.put(line, obj);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index 585a2f3..d14b7a2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -48,7 +48,7 @@ import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
 import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;
 import io.netty.handler.codec.http.multipart.InterfaceHttpData;
 import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
-
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.File;
@@ -65,7 +65,7 @@ import java.util.UUID;
 @ChannelHandler.Sharable
 public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
 
-	private static final Charset ENCODING = Charset.forName("UTF-8");
+	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
 
 	/** A decoder factory that always stores POST chunks on disk */
 	private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
index b4788dd..85b3b13 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
@@ -26,9 +26,8 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
-
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.ExceptionUtils;
-
 import org.slf4j.Logger;
 
 /**
@@ -61,7 +60,8 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<Object> {
 	private void sendError(ChannelHandlerContext ctx, String error) {
 		if (ctx.channel().isActive()) {
 			DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
-						HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(error.getBytes()));
+				HttpResponseStatus.INTERNAL_SERVER_ERROR,
+				Unpooled.wrappedBuffer(error.getBytes(ConfigConstants.DEFAULT_CHARSET)));
 
 			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
 			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 8bd58a3..3d7fbed 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -30,6 +30,7 @@ import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -59,7 +60,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);
 
-	private static final Charset ENCODING = Charset.forName("UTF-8");
+	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
 
 	public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
index 127efdb..53f9f04 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
@@ -29,8 +29,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
-
-import java.io.UnsupportedEncodingException;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Responder that returns a constant String.
@@ -41,12 +40,7 @@ public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
 	private final byte[] encodedText;
 
 	public ConstantTextHandler(String text) {
-		try {
-			this.encodedText = text.getBytes("UTF-8");
-		}
-		catch (UnsupportedEncodingException e) {
-			throw new RuntimeException(e.getMessage(), e);
-		}
+		this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index ca61ec1..6616a2a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
@@ -32,8 +33,6 @@ import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -94,9 +93,9 @@ public class HandlerRedirectUtils {
 		return redirectResponse;
 	}
 
-	public static HttpResponse getUnavailableResponse() throws UnsupportedEncodingException {
+	public static HttpResponse getUnavailableResponse() {
 		String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
-		byte[] bytes = result.getBytes(Charset.forName("UTF-8"));
+		byte[] bytes = result.getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 		HttpResponse unavailableResponse = new DefaultFullHttpResponse(
 				HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index b618d85..f5d6853 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -62,7 +62,7 @@ public class JobCancellationWithSavepointHandlers {
 	private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
 
 	/** Encodings for String. */
-	private static final Charset ENCODING = Charset.forName("UTF-8");
+	private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
 
 	/** Shared lock between Trigger and In-Progress handlers. */
 	private final Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 1002bf3..37ee814 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -46,6 +46,7 @@ import io.netty.handler.stream.ChunkedFile;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -345,7 +346,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 			response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
 		}
 
-		byte[] buf = message.getBytes();
+		byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 		ByteBuf b = Unpooled.copiedBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 1db1369..4177f44 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.router.Routed;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -139,7 +140,7 @@ public class TaskManagerLogHandlerTest {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
 				ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
-				exception.set(new String(data.array()));
+				exception.set(new String(data.array(), ConfigConstants.DEFAULT_CHARSET));
 				return null;
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index b5ba565..c540f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.highavailability.ZookeeperHaServices;
@@ -64,7 +65,7 @@ public class BlobUtils {
 	/**
 	 * The default character set to translate between characters and bytes.
 	 */
-	static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+	static final Charset DEFAULT_CHARSET = ConfigConstants.DEFAULT_CHARSET;
 
 	/**
 	 * Creates a BlobStore based on the parameters set in the configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
index 5184db8..08ec35e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -94,11 +95,12 @@ public class SavepointV1Test {
 				for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
 
 					StreamStateHandle nonPartitionableState =
-							new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes());
+							new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes(
+								ConfigConstants.DEFAULT_CHARSET));
 					StreamStateHandle operatorStateBackend =
-							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes());
+							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
 					StreamStateHandle operatorStateStream =
-							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes());
+							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
 					Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
 					offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
 					offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
@@ -127,13 +129,15 @@ public class SavepointV1Test {
 				if (hasKeyedBackend) {
 					keyedStateBackend = new KeyGroupsStateHandle(
 							new KeyGroupRangeOffsets(1, 1, new long[]{42}),
-							new TestByteStreamStateHandleDeepCompare("c", "Hello".getBytes()));
+							new TestByteStreamStateHandleDeepCompare("c", "Hello"
+								.getBytes(ConfigConstants.DEFAULT_CHARSET)));
 				}
 
 				if (hasKeyedStream) {
 					keyedStateStream = new KeyGroupsStateHandle(
 							new KeyGroupRangeOffsets(1, 1, new long[]{23}),
-							new TestByteStreamStateHandleDeepCompare("d", "World".getBytes()));
+							new TestByteStreamStateHandleDeepCompare("d", "World"
+								.getBytes(ConfigConstants.DEFAULT_CHARSET)));
 				}
 
 				taskState.putState(subtaskIdx, new SubtaskState(

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
index b5b4d76..74de096 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
@@ -19,12 +19,13 @@
 
 package org.apache.flink.runtime.io.network.api.serialization.types;
 
-import java.io.IOException;
-import java.util.Random;
-
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.IOException;
+import java.util.Random;
+
 public class AsciiStringType implements SerializationTestType {
 
 	private static final int MAX_LEN = 1500;
@@ -54,7 +55,7 @@ public class AsciiStringType implements SerializationTestType {
 
 	@Override
 	public int length() {
-		return value.getBytes().length + 2;
+		return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 2;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index f9aea89..1fd004f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
@@ -27,13 +27,12 @@ import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +45,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class DataSinkTaskTest extends TaskTestBase {
 	
@@ -468,7 +467,7 @@ public class DataSinkTaskTest extends TaskTestBase {
 			this.bld.append(value.getValue());
 			this.bld.append('\n');
 
-			byte[] bytes = this.bld.toString().getBytes();
+			byte[] bytes = this.bld.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 			this.stream.write(bytes);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 1ebdcb0..82f3d1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -247,7 +248,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		@Override
 		public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
 			
-			String line = new String(record, offset, numBytes);
+			String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
 			
 			try {
 				this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
@@ -290,7 +291,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 				return null;
 			}
 			
-			String line = new String(record, offset, numBytes);
+			String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
 			
 			try {
 				this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
@@ -324,7 +325,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 			
 			this.cnt++;
 			
-			String line = new String(record, offset, numBytes);
+			String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
 			
 			try {
 				this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 8617193..980075d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -107,7 +108,7 @@ public class FsCheckpointStateOutputStreamTest {
 
 		stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);
 
-		byte[] data = "testme!".getBytes();
+		byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 		for (int i = 0; i < 7; ++i) {
 			Assert.assertEquals(i * (1 + data.length), stream.getPos());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 2de4c01..7f5a841 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Very simple serialization schema for strings.
@@ -31,7 +32,7 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
 
 	@Override
 	public String deserialize(byte[] message) {
-		return new String(message);
+		return new String(message, ConfigConstants.DEFAULT_CHARSET);
 	}
 
 	@Override
@@ -41,7 +42,7 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
 
 	@Override
 	public byte[] serialize(String element) {
-		return element.getBytes();
+		return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 23491da..877e707 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.TestLogger;
@@ -55,7 +56,7 @@ public class SocketClientSinkTest extends TestLogger {
 	private SerializationSchema<String> simpleSchema = new SerializationSchema<String>() {
 		@Override
 		public byte[] serialize(String element) {
-			return element.getBytes();
+			return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
 		}
 	};
 
@@ -300,4 +301,4 @@ public class SocketClientSinkTest extends TestLogger {
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 90d8861..0ad42bb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -340,7 +341,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 		for(int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 		return new Tuple2<>(tmp, str.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
new file mode 100644
index 0000000..8fdf74b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.misc;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParserTest;
+import org.apache.flink.types.parser.VarLengthStringParserTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckForbiddenMethodsUsage {
+
+	private static class ForbiddenCall {
+		private final Method method;
+		private final Constructor constructor;
+		private final List<Member> exclusions;
+
+		public Method getMethod() {
+			return method;
+		}
+
+		public List<Member> getExclusions() {
+			return exclusions;
+		}
+
+		private ForbiddenCall(Method method, Constructor ctor, List<Member> exclusions) {
+			this.method = method;
+			this.exclusions = exclusions;
+			this.constructor = ctor;
+		}
+
+		public static ForbiddenCall of(Method method) {
+			return new ForbiddenCall(method, null, Collections.<Member>emptyList());
+		}
+
+		public static ForbiddenCall of(Method method, List<Member> exclusions) {
+			return new ForbiddenCall(method, null, exclusions);
+		}
+
+		public static ForbiddenCall of(Constructor ctor) {
+			return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
+		}
+
+		public static ForbiddenCall of(Constructor ctor, List<Member> exclusions) {
+			return new ForbiddenCall(null, ctor, exclusions);
+		}
+
+		public Set<Member> getUsages(Reflections reflections) {
+			if (method == null) {
+				return reflections.getConstructorUsage(constructor);
+			}
+
+			return reflections.getMethodUsage(method);
+		}
+	}
+
+	private static List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
+
+	@BeforeClass
+	public static void init() throws Exception {
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
+			Lists.<Member>newArrayList(
+				FieldParserTest.class.getMethod("testEndsWithDelimiter"),
+				FieldParserTest.class.getMethod("testDelimiterNext")
+			)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
+	}
+
+	@Test
+	public void testNoDefaultEncoding() throws Exception {
+		final Reflections reflections = new Reflections(new ConfigurationBuilder()
+			.addUrls(ClasspathHelper.forPackage("org.apache.flink"))
+			.addScanners(new MemberUsageScanner()));
+
+
+		for (ForbiddenCall forbiddenCall : forbiddenCalls) {
+			final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
+			methodUsages.removeAll(forbiddenCall.getExclusions());
+			assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+		}
+	}
+}


[9/9] flink git commit: [FLINK-5824] (followup) Minor code cleanups in CheckForbiddenMethodsUsage

Posted by se...@apache.org.
[FLINK-5824] (followup) Minor code cleanups in CheckForbiddenMethodsUsage

  - Move to maual tests package
  - Adjust order of methods (ctor, instance, class)
  - Replace Guava with Java Util
  - Make charset in SimpleStringSchema configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab91cc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab91cc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab91cc5

Branch: refs/heads/master
Commit: 3ab91cc5d233265b7c4a1497e7294c9065261dec
Parents: 53fedbd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 8 16:10:52 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100

----------------------------------------------------------------------
 .../util/serialization/SimpleStringSchema.java  |  61 ++++++++-
 .../serialization/SimpleStringSchemaTest.java   |  51 ++++++++
 .../test/manual/CheckForbiddenMethodsUsage.java | 126 +++++++++++++++++++
 .../test/misc/CheckForbiddenMethodsUsage.java   | 115 -----------------
 4 files changed, 235 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 7f5a841..ddc55a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -20,19 +20,59 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigConstants;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Very simple serialization schema for strings.
+ * 
+ * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
  */
 @PublicEvolving
 public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
 
 	private static final long serialVersionUID = 1L;
 
+	/** The charset to use to convert between strings and bytes.
+	 * The field is transient because we serialize a different delegate object instead */
+	private transient Charset charset;
+
+	/**
+	 * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
+	 */
+	public SimpleStringSchema() {
+		this(StandardCharsets.UTF_8);
+	}
+
+	/**
+	 * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
+	 * 
+	 * @param charset The charset to use to convert between strings and bytes.
+	 */
+	public SimpleStringSchema(Charset charset) {
+		this.charset = checkNotNull(charset);
+	}
+
+	/**
+	 * Gets the charset used by this schema for serialization.
+	 * @return The charset used by this schema for serialization.
+	 */
+	public Charset getCharset() {
+		return charset;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Kafka Serialization
+	// ------------------------------------------------------------------------
+
 	@Override
 	public String deserialize(byte[] message) {
-		return new String(message, ConfigConstants.DEFAULT_CHARSET);
+		return new String(message, charset);
 	}
 
 	@Override
@@ -42,11 +82,26 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
 
 	@Override
 	public byte[] serialize(String element) {
-		return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
+		return element.getBytes(charset);
 	}
 
 	@Override
 	public TypeInformation<String> getProducedType() {
 		return BasicTypeInfo.STRING_TYPE_INFO;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Java Serialization
+	// ------------------------------------------------------------------------
+
+	private void writeObject (ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+		out.writeUTF(charset.name());
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		String charsetName = in.readUTF();
+		this.charset = Charset.forName(charsetName);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
new file mode 100644
index 0000000..74b1d18
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link SimpleStringSchema}.
+ */
+public class SimpleStringSchemaTest {
+
+	@Test
+	public void testSerializationWithAnotherCharset() {
+		final Charset charset = StandardCharsets.UTF_16BE;
+		final String string = "\u4e4b\u6383\u63cf\u53e4\u7c4d\u7248\u5be6\u4e43\u59da\u9f10\u7684";
+		final byte[] bytes = string.getBytes(charset);
+
+		assertArrayEquals(bytes, new SimpleStringSchema(charset).serialize(string));
+		assertEquals(string, new SimpleStringSchema(charset).deserialize(bytes));
+	}
+
+	@Test
+	public void testSerializability() throws Exception {
+		final SimpleStringSchema schema = new SimpleStringSchema(StandardCharsets.UTF_16LE);
+		final SimpleStringSchema copy = CommonTestUtils.createCopySerializable(schema);
+
+		assertEquals(schema.getCharset(), copy.getCharset());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
new file mode 100644
index 0000000..aabe7c0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.manual;
+
+import org.apache.flink.types.parser.FieldParserTest;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests via reflection that certain methods are not called in Flink.
+ * 
+ * <p>Forbidden calls include:
+ *   - Byte / String conversions that do not specify an explicit charset
+ *     because they produce different results in different locales
+ */
+public class CheckForbiddenMethodsUsage {
+
+	private static class ForbiddenCall {
+
+		private final Method method;
+		private final Constructor<?> constructor;
+		private final List<Member> exclusions;
+
+		private ForbiddenCall(Method method, Constructor<?> ctor, List<Member> exclusions) {
+			this.method = method;
+			this.exclusions = exclusions;
+			this.constructor = ctor;
+		}
+
+		public Method getMethod() {
+			return method;
+		}
+
+		public List<Member> getExclusions() {
+			return exclusions;
+		}
+
+		public Set<Member> getUsages(Reflections reflections) {
+			if (method == null) {
+				return reflections.getConstructorUsage(constructor);
+			}
+
+			return reflections.getMethodUsage(method);
+		}
+
+		public static ForbiddenCall of(Method method) {
+			return new ForbiddenCall(method, null, Collections.<Member>emptyList());
+		}
+
+		public static ForbiddenCall of(Method method, List<Member> exclusions) {
+			return new ForbiddenCall(method, null, exclusions);
+		}
+
+		public static ForbiddenCall of(Constructor<?> ctor) {
+			return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
+		}
+
+		public static ForbiddenCall of(Constructor<?> ctor, List<Member> exclusions) {
+			return new ForbiddenCall(null, ctor, exclusions);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
+
+	@BeforeClass
+	public static void init() throws Exception {
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
+			Arrays.<Member>asList(
+				FieldParserTest.class.getMethod("testEndsWithDelimiter"),
+				FieldParserTest.class.getMethod("testDelimiterNext")
+			)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
+		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
+	}
+
+	@Test
+	public void testNoDefaultEncoding() throws Exception {
+		final Reflections reflections = new Reflections(new ConfigurationBuilder()
+			.useParallelExecutor(Runtime.getRuntime().availableProcessors())
+			.addUrls(ClasspathHelper.forPackage("org.apache.flink"))
+			.addScanners(new MemberUsageScanner()));
+
+
+		for (ForbiddenCall forbiddenCall : forbiddenCalls) {
+			final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
+			methodUsages.removeAll(forbiddenCall.getExclusions());
+			assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
deleted file mode 100644
index 8fdf74b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.misc;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FieldParserTest;
-import org.apache.flink.types.parser.VarLengthStringParserTest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.MemberUsageScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Member;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class CheckForbiddenMethodsUsage {
-
-	private static class ForbiddenCall {
-		private final Method method;
-		private final Constructor constructor;
-		private final List<Member> exclusions;
-
-		public Method getMethod() {
-			return method;
-		}
-
-		public List<Member> getExclusions() {
-			return exclusions;
-		}
-
-		private ForbiddenCall(Method method, Constructor ctor, List<Member> exclusions) {
-			this.method = method;
-			this.exclusions = exclusions;
-			this.constructor = ctor;
-		}
-
-		public static ForbiddenCall of(Method method) {
-			return new ForbiddenCall(method, null, Collections.<Member>emptyList());
-		}
-
-		public static ForbiddenCall of(Method method, List<Member> exclusions) {
-			return new ForbiddenCall(method, null, exclusions);
-		}
-
-		public static ForbiddenCall of(Constructor ctor) {
-			return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
-		}
-
-		public static ForbiddenCall of(Constructor ctor, List<Member> exclusions) {
-			return new ForbiddenCall(null, ctor, exclusions);
-		}
-
-		public Set<Member> getUsages(Reflections reflections) {
-			if (method == null) {
-				return reflections.getConstructorUsage(constructor);
-			}
-
-			return reflections.getMethodUsage(method);
-		}
-	}
-
-	private static List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
-
-	@BeforeClass
-	public static void init() throws Exception {
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
-			Lists.<Member>newArrayList(
-				FieldParserTest.class.getMethod("testEndsWithDelimiter"),
-				FieldParserTest.class.getMethod("testDelimiterNext")
-			)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
-		forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
-	}
-
-	@Test
-	public void testNoDefaultEncoding() throws Exception {
-		final Reflections reflections = new Reflections(new ConfigurationBuilder()
-			.addUrls(ClasspathHelper.forPackage("org.apache.flink"))
-			.addScanners(new MemberUsageScanner()));
-
-
-		for (ForbiddenCall forbiddenCall : forbiddenCalls) {
-			final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
-			methodUsages.removeAll(forbiddenCall.getExclusions());
-			assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
-		}
-	}
-}


[3/9] flink git commit: [FLINK-5598] [web frontend] Return filename after jar upload

Posted by se...@apache.org.
[FLINK-5598] [web frontend] Return filename after jar upload

This closes #3469


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84afd068
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84afd068
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84afd068

Branch: refs/heads/master
Commit: 84afd068ac229e4d598fffdf10075e9d356fbe07
Parents: 59f0f7a
Author: Fabian Wollert <da...@gmail.com>
Authored: Fri Mar 3 17:40:02 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/handlers/JarUploadHandler.java     | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84afd068/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 3d7cb8a..ec8516d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -59,10 +59,11 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 				return "{\"error\": \"Only Jar files are allowed.\"}";
 			}
 			
-			File newFile = new File(jarDir, UUID.randomUUID() + "_" + filename);
+			String filenameWithUUID = UUID.randomUUID() + "_" + filename;
+			File newFile = new File(jarDir, filenameWithUUID);
 			if (tempFile.renameTo(newFile)) {
 				// all went well
-				return "{}";
+				return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
 			}
 			else {
 				//noinspection ResultOfMethodCallIgnored


[8/9] flink git commit: [FLINK-5135] [runtime] Expand the fields in ResourceProfile based on ResourceSpec

Posted by se...@apache.org.
[FLINK-5135] [runtime] Expand the fields in ResourceProfile based on ResourceSpec

This closes #3457


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2592a19c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2592a19c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2592a19c

Branch: refs/heads/master
Commit: 2592a19ccd4f83e4f57a835b03c7846d6edde927
Parents: 527eabd
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Thu Mar 2 18:23:42 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java | 104 +++++++++++++++----
 .../taskexecutor/TaskManagerServices.java       |   2 +-
 .../types/ResourceProfileTest.java              |   8 +-
 .../slotmanager/SlotManagerTest.java            |   2 +-
 .../taskexecutor/TaskExecutorITCase.java        |   2 +-
 5 files changed, 91 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index ddc7547..faa93e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -36,27 +36,54 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	private static final long serialVersionUID = 1L;
 
-	public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1L);
+	public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
 
 	// ------------------------------------------------------------------------
 
 	/** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
 	private final double cpuCores;
 
-	/** How many memory in mb are needed */
-	private final long memoryInMB;
+	/** How many heap memory in mb are needed */
+	private final int heapMemoryInMB;
+
+	/** How many direct memory in mb are needed */
+	private final int directMemoryInMB;
+
+	/** How many native memory in mb are needed */
+	private final int nativeMemoryInMB;
 
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new ResourceProfile.
-	 * 
-	 * @param cpuCores   The number of CPU cores (possibly fractional, i.e., 0.2 cores)
-	 * @param memoryInMB The size of the memory, in megabytes.
+	 *
+	 * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
+	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
+	 * @param directMemoryInMB The size of the direct memory, in megabytes.
+	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 	 */
-	public ResourceProfile(double cpuCores, long memoryInMB) {
+	public ResourceProfile(
+			double cpuCores,
+			int heapMemoryInMB,
+			int directMemoryInMB,
+			int nativeMemoryInMB) {
 		this.cpuCores = cpuCores;
-		this.memoryInMB = memoryInMB;
+		this.heapMemoryInMB = heapMemoryInMB;
+		this.directMemoryInMB = directMemoryInMB;
+		this.nativeMemoryInMB = nativeMemoryInMB;
+	}
+
+	/**
+	 * Creates a new simple ResourceProfile used for testing.
+	 *
+	 * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
+	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
+	 */
+	public ResourceProfile(double cpuCores, int heapMemoryInMB) {
+		this.cpuCores = cpuCores;
+		this.heapMemoryInMB = heapMemoryInMB;
+		this.directMemoryInMB = 0;
+		this.nativeMemoryInMB = 0;
 	}
 
 	/**
@@ -66,7 +93,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 */
 	public ResourceProfile(ResourceProfile other) {
 		this.cpuCores = other.cpuCores;
-		this.memoryInMB = other.memoryInMB;
+		this.heapMemoryInMB = other.heapMemoryInMB;
+		this.directMemoryInMB = other.directMemoryInMB;
+		this.nativeMemoryInMB = other.nativeMemoryInMB;
 	}
 
 	// ------------------------------------------------------------------------
@@ -80,11 +109,35 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
-	 * Get the memory needed in MB
-	 * @return The memory in MB
+	 * Get the heap memory needed in MB
+	 * @return The heap memory in MB
+	 */
+	public long getHeapMemoryInMB() {
+		return heapMemoryInMB;
+	}
+
+	/**
+	 * Get the direct memory needed in MB
+	 * @return The direct memory in MB
+	 */
+	public int getDirectMemoryInMB() {
+		return directMemoryInMB;
+	}
+
+	/**
+	 * Get the native memory needed in MB
+	 * @return The native memory in MB
+	 */
+	public int getNativeMemoryInMB() {
+		return nativeMemoryInMB;
+	}
+
+	/**
+	 * Get the total memory needed in MB
+	 * @return The total memory in MB
 	 */
-	public long getMemoryInMB() {
-		return memoryInMB;
+	public int getMemoryInMB() {
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
 	}
 
 	/**
@@ -94,22 +147,29 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @return true if the requirement is matched, otherwise false
 	 */
 	public boolean isMatching(ResourceProfile required) {
-		return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
+		return cpuCores >= required.getCpuCores() &&
+				heapMemoryInMB >= required.getHeapMemoryInMB() &&
+				directMemoryInMB >= required.getDirectMemoryInMB() &&
+				nativeMemoryInMB >= required.getNativeMemoryInMB();
 	}
 
 	@Override
 	public int compareTo(@Nonnull ResourceProfile other) {
-		int cmp1 = Long.compare(this.memoryInMB, other.memoryInMB);
+		int cmp1 = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB());
 		int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
-		return (cmp1 != 0) ? cmp1 : cmp2; 
+		return (cmp1 != 0) ? cmp1 : cmp2;
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {
-		long cpuBits = Double.doubleToRawLongBits(cpuCores);
-		return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ (memoryInMB >> 32));
+		final long cpuBits =  Double.doubleToLongBits(cpuCores);
+		int result = (int) (cpuBits ^ (cpuBits >>> 32));
+		result = 31 * result + heapMemoryInMB;
+		result = 31 * result + directMemoryInMB;
+		result = 31 * result + nativeMemoryInMB;
+		return result;
 	}
 
 	@Override
@@ -119,7 +179,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		}
 		else if (obj != null && obj.getClass() == ResourceProfile.class) {
 			ResourceProfile that = (ResourceProfile) obj;
-			return this.cpuCores == that.cpuCores && this.memoryInMB == that.memoryInMB; 
+			return this.cpuCores == that.cpuCores &&
+					this.heapMemoryInMB == that.heapMemoryInMB &&
+					this.directMemoryInMB == that.directMemoryInMB;
 		}
 		else {
 			return false;
@@ -130,7 +192,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	public String toString() {
 		return "ResourceProfile{" +
 			"cpuCores=" + cpuCores +
-			", memoryInMB=" + memoryInMB +
+			", heapMemoryInMB=" + heapMemoryInMB +
+			", directMemoryInMB=" + directMemoryInMB +
+			", nativeMemoryInMB=" + nativeMemoryInMB +
 			'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ae5a383..19c5c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -201,7 +201,7 @@ public class TaskManagerServices {
 		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
 
 		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
-			resourceProfiles.add(new ResourceProfile(1.0, 42L));
+			resourceProfiles.add(new ResourceProfile(1.0, 42));
 		}
 
 		final TimerService<AllocationID> timerService = new TimerService<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index cd1d895..aacdcfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -27,10 +27,10 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMatchRequirement() throws Exception {
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100);
-		ResourceProfile rp2 = new ResourceProfile(1.0, 200);
-		ResourceProfile rp3 = new ResourceProfile(2.0, 100);
-		ResourceProfile rp4 = new ResourceProfile(2.0, 200);
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100);
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200);
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100);
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200);
 
 		assertFalse(rp1.isMatching(rp2));
 		assertTrue(rp2.isMatching(rp1));

http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 948c129..041747d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -53,7 +53,7 @@ public class SlotManagerTest {
 
 	private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
 
-	private static final long DEFAULT_TESTING_MEMORY = 512;
+	private static final int DEFAULT_TESTING_MEMORY = 512;
 
 	private static final ResourceProfile DEFAULT_TESTING_PROFILE =
 		new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 36fd65b..0f884f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -86,7 +86,7 @@ public class TaskExecutorITCase {
 		final String jmAddress = "jm";
 		final UUID jmLeaderId = UUID.randomUUID();
 		final JobID jobId = new JobID();
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1L);
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 
 		testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 		testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);


[2/9] flink git commit: [FLINK-5824] Fix String/byte conversions without explicit encoding

Posted by se...@apache.org.
[FLINK-5824] Fix String/byte conversions without explicit encoding

This closes #3468


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53fedbd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53fedbd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53fedbd2

Branch: refs/heads/master
Commit: 53fedbd2894c6c7b839d8fdcc0dbf1e6e21e631a
Parents: 84afd06
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Fri Mar 3 13:24:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |   9 +-
 .../kafka/internals/ZookeeperOffsetHandler.java |   5 +-
 .../testutils/ZooKeeperStringSerializer.java    |   9 +-
 .../manualtests/ManualConsumerProducerTest.java |   3 +-
 ...nualExactlyOnceWithStreamReshardingTest.java |   4 +-
 .../kinesis/manualtests/ManualProducerTest.java |   3 +-
 .../testutils/FakeKinesisBehavioursFactory.java |   5 +-
 .../nifi/examples/NiFiSinkTopologyExample.java  |   4 +-
 .../connectors/rabbitmq/RMQSourceTest.java      |   7 +-
 .../typeutils/runtime/StringArrayWritable.java  |   5 +-
 .../hbase/example/HBaseFlinkTestConstants.java  |   6 +-
 .../addons/hbase/example/HBaseWriteExample.java |   5 +-
 .../state/RocksDBKeyedStateBackend.java         |   8 +-
 .../state/RocksDBMergeIteratorTest.java         |   3 +-
 .../flink/storm/wrappers/StormTupleTest.java    |   5 +-
 .../io/SimpleTweetInputFormat.java              |   2 +-
 .../api/common/io/GenericCsvInputFormat.java    |   6 +-
 .../flink/configuration/ConfigConstants.java    |   7 ++
 .../memory/ByteArrayOutputStreamWithPos.java    |   3 +-
 .../apache/flink/types/parser/BigIntParser.java |   5 +-
 .../flink/types/parser/BooleanParser.java       |   9 +-
 .../apache/flink/types/parser/DoubleParser.java |   5 +-
 .../flink/types/parser/DoubleValueParser.java   |   3 +-
 .../apache/flink/types/parser/FloatParser.java  |   5 +-
 .../flink/types/parser/FloatValueParser.java    |   3 +-
 .../flink/types/parser/SqlDateParser.java       |   5 +-
 .../flink/types/parser/SqlTimeParser.java       |   5 +-
 .../flink/types/parser/SqlTimestampParser.java  |   5 +-
 .../api/common/io/DelimitedInputFormatTest.java |   7 +-
 .../api/common/io/FileInputFormatTest.java      |   7 +-
 .../common/state/ValueStateDescriptorTest.java  |   2 +-
 .../runtime/kryo/KryoClearedBufferTest.java     |   8 +-
 .../flink/types/parser/ParserTestBase.java      |  21 ++--
 .../types/parser/VarLengthStringParserTest.java |  23 ++--
 .../socket/SocketWindowWordCountITCase.java     |   7 +-
 .../ContinuousFileProcessingITCase.java         |   3 +-
 .../ContinuousFileProcessingMigrationTest.java  |   3 +-
 .../hdfstests/ContinuousFileProcessingTest.java |   3 +-
 .../org/apache/flink/hdfstests/HDFSTest.java    |   5 +-
 .../flink/api/java/io/PrimitiveInputFormat.java |   2 +-
 .../flink/api/java/io/RowCsvInputFormat.java    |   2 +-
 .../flink/api/java/io/CsvInputFormatTest.java   |   6 +-
 .../api/java/io/RowCsvInputFormatTest.java      |   3 +-
 .../functions/util/StringDeserializerMap.java   |   3 +-
 .../util/StringTupleDeserializerMap.java        |   3 +-
 .../api/streaming/data/PythonStreamer.java      |  34 +++---
 .../api/streaming/plan/PythonPlanReceiver.java  |   4 +-
 .../api/streaming/plan/PythonPlanStreamer.java  |   9 +-
 .../api/streaming/util/SerializationUtils.java  |   8 +-
 .../api/streaming/util/StreamPrinter.java       |   4 +-
 .../store/ZooKeeperMesosWorkerStore.java        |   7 +-
 .../flink/metrics/statsd/StatsDReporter.java    |   3 +-
 .../metrics/statsd/StatsDReporterTest.java      |   2 +-
 .../runtime/webmonitor/HttpRequestHandler.java  |   4 +-
 .../webmonitor/PipelineErrorHandler.java        |   6 +-
 .../webmonitor/RuntimeMonitorHandler.java       |   3 +-
 .../handlers/ConstantTextHandler.java           |  10 +-
 .../handlers/HandlerRedirectUtils.java          |   7 +-
 .../JobCancellationWithSavepointHandlers.java   |   2 +-
 .../handlers/TaskManagerLogHandler.java         |   3 +-
 .../handlers/TaskManagerLogHandlerTest.java     |   3 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   3 +-
 .../checkpoint/savepoint/SavepointV1Test.java   |  14 ++-
 .../serialization/types/AsciiStringType.java    |   9 +-
 .../runtime/operators/DataSinkTaskTest.java     |   9 +-
 .../runtime/operators/DataSourceTaskTest.java   |   7 +-
 .../FsCheckpointStateOutputStreamTest.java      |   3 +-
 .../util/serialization/SimpleStringSchema.java  |   5 +-
 .../functions/sink/SocketClientSinkTest.java    |   5 +-
 ...ontinuousFileProcessingCheckpointITCase.java |   3 +-
 .../test/misc/CheckForbiddenMethodsUsage.java   | 115 +++++++++++++++++++
 71 files changed, 359 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 600d1e5..1db45a5 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -25,13 +25,14 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import java.io.IOException;
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
 
 	/**
@@ -154,7 +155,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 		}
 
 		if(userDefinedSchema != null) {
-			byte[] json = userDefinedSchema.toString().getBytes();
+			byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
 			out.writeInt(json.length);
 			out.write(json);
 		} else {
@@ -175,7 +176,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 			byte[] json = new byte[length];
 			in.readFully(json);
 
-			Schema schema = new Schema.Parser().parse(new String(json));
+			Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
 			setSchema(schema);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index cec980f..c02c2cb 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.slf4j.Logger;
@@ -119,7 +120,7 @@ public class ZookeeperOffsetHandler {
 		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
 		String path = topicDirs.consumerOffsetDir() + "/" + partition;
 		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-		byte[] data = Long.toString(offset).getBytes();
+		byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
 		curatorClient.setData().forPath(path, data);
 	}
 
@@ -133,7 +134,7 @@ public class ZookeeperOffsetHandler {
 		if (data == null) {
 			return null;
 		} else {
-			String asString = new String(data);
+			String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
 			if (asString.length() == 0) {
 				return null;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
index 8a4c408..37ed408 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -19,20 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Simple ZooKeeper serializer for Strings.
  */
 public class ZooKeeperStringSerializer implements ZkSerializer {
 
-	private static final Charset CHARSET = Charset.forName("UTF-8");
-	
 	@Override
 	public byte[] serialize(Object data) {
 		if (data instanceof String) {
-			return ((String) data).getBytes(CHARSET);
+			return ((String) data).getBytes(ConfigConstants.DEFAULT_CHARSET);
 		}
 		else {
 			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
@@ -45,7 +42,7 @@ public class ZooKeeperStringSerializer implements ZkSerializer {
 			return null;
 		}
 		else {
-			return new String(bytes, CHARSET);
+			return new String(bytes, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 6e02a55..63c6c2b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
@@ -62,7 +63,7 @@ public class ManualConsumerProducerTest {
 				new KinesisSerializationSchema<String>() {
 					@Override
 					public ByteBuffer serialize(String element) {
-						return ByteBuffer.wrap(element.getBytes());
+						return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
 					}
 
 					// every 10th element goes into a different stream

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 6abea2a..71bcae3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
 import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
@@ -119,7 +119,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 								}
 								batch.add(
 									new PutRecordsRequestEntry()
-										.withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes()))
+										.withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes(ConfigConstants.DEFAULT_CHARSET)))
 										.withPartitionKey(UUID.randomUUID().toString()));
 							}
 							count += batchSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 35e9ef6..1df717c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -17,6 +17,7 @@
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
@@ -59,7 +60,7 @@ public class ManualProducerTest {
 				new KinesisSerializationSchema<String>() {
 					@Override
 					public ByteBuffer serialize(String element) {
-						return ByteBuffer.wrap(element.getBytes());
+						return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
 					}
 
 					// every 10th element goes into a different stream

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 964ee76..b62e7de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -21,14 +21,15 @@ import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -196,7 +197,7 @@ public class FakeKinesisBehavioursFactory {
 			for (int i = min; i < max; i++) {
 				batch.add(
 					new Record()
-						.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+						.withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET)))
 						.withPartitionKey(UUID.randomUUID().toString())
 						.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
 						.withSequenceNumber(String.valueOf(i)));

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
index 572f949..202e80a 100644
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -17,6 +17,7 @@
 package org.apache.flink.streaming.connectors.nifi.examples;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
@@ -45,7 +46,8 @@ public class NiFiSinkTopologyExample {
 				.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
 					@Override
 					public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
-						return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+						return new StandardNiFiDataPacket(s.getBytes(ConfigConstants.DEFAULT_CHARSET),
+							new HashMap<String,String>());
 					}
 				}));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 26434ed..b65ddf0 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -45,7 +47,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.modules.junit4.PowerMockRunner;
-import com.rabbitmq.client.Connection;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -323,7 +324,7 @@ public class RMQSourceTest {
 			} catch (InterruptedException e) {
 				e.printStackTrace();
 			}
-			return new String(message);
+			return new String(message, ConfigConstants.DEFAULT_CHARSET);
 		}
 
 		@Override
@@ -365,7 +366,7 @@ public class RMQSourceTest {
 
 			// Mock for delivery
 			final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
-			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes(ConfigConstants.DEFAULT_CHARSET));
 
 			try {
 				Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
index c32f5da..8c3a8cd 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.hadoop.io.Writable;
 
 import java.io.DataInput;
@@ -41,7 +42,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
 		out.writeInt(this.array.length);
 		
 		for(String str : this.array) {
-			byte[] b = str.getBytes();
+			byte[] b = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			out.writeInt(b.length);
 			out.write(b);
 		}
@@ -54,7 +55,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
 		for(int i = 0; i < this.array.length; i++) {
 			byte[] b = new byte[in.readInt()];
 			in.readFully(b);
-			this.array[i] = new String(b);
+			this.array[i] = new String(b, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index 8579dee..f56295e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.addons.hbase.example;
 
+import org.apache.flink.configuration.ConfigConstants;
+
 public class HBaseFlinkTestConstants {
 	
-	public static final byte[] CF_SOME = "someCf".getBytes();
-	public static final byte[] Q_SOME = "someQual".getBytes();
+	public static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
+	public static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
 	public static final String TEST_TABLE_NAME = "test-table";
 	public static final String TMP_DIR = "/tmp/test";
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 483bdff..64d20c3 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -87,7 +88,7 @@ public class HBaseWriteExample {
 			@Override
 			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
 				reuse.f0 = new Text(t.f0);
-				Put put = new Put(t.f0.getBytes());
+				Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
 				put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
 				reuse.f1 = put;
 				return reuse;
@@ -199,4 +200,4 @@ public class HBaseWriteExample {
 		"The fair Ophelia!--Nymph, in thy orisons",
 		"Be all my sins remember'd."
 	};
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bd8d4dd..eb926c0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
@@ -171,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
 		// RocksDB seems to need this...
-		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
 		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
 		try {
 
@@ -727,7 +728,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				if (null == columnFamily) {
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-							metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+						metaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+						rocksDBKeyedStateBackend.columnOptions);
 
 					RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
 							new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
@@ -824,7 +826,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
-				descriptor.getName().getBytes(), columnOptions);
+				descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions);
 
 		try {
 			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 1cb3b2b..956ef2f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Assert;
@@ -71,7 +72,7 @@ public class RocksDBMergeIteratorTest {
 
 			for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
 				ColumnFamilyHandle handle = rocksDB.createColumnFamily(
-						new ColumnFamilyDescriptor(("column-" + c).getBytes()));
+						new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
 
 				ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos();
 				DataOutputStream dos = new DataOutputStream(bos);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
index 7ea4b76..5e6c160 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.MessageId;
@@ -188,7 +189,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testString() {
 		final byte[] data = new byte[this.r.nextInt(15)];
 		this.r.nextBytes(data);
-		final String flinkTuple = new String(data);
+		final String flinkTuple = new String(data, ConfigConstants.DEFAULT_CHARSET);
 
 		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null, -1, null, null,
 				null);
@@ -304,7 +305,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testStringTuple() {
 		final byte[] rawdata = new byte[this.r.nextInt(15)];
 		this.r.nextBytes(rawdata);
-		final String data = new String(rawdata);
+		final String data = new String(rawdata, ConfigConstants.DEFAULT_CHARSET);
 
 		final int index = this.r.nextInt(5);
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
index a72fc14..f7f1bde 100644
--- a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
@@ -90,4 +90,4 @@ public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implemen
 	public TypeInformation<Tweet> getProducedType() {
 		return new GenericTypeInfo<Tweet>(Tweet.class);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index b934d41..bddaec9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -362,7 +362,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 				if (lenient) {
 					return false;
 				} else {
-					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
 				}
 			}
 
@@ -380,7 +380,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 					if (lenient) {
 						return false;
 					} else {
-						String lineAsString = new String(bytes, offset, numBytes);
+						String lineAsString = new String(bytes, offset, numBytes, getCharset());
 						throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
 								+ "ParserError " + parser.getErrorState() + " \n"
 								+ "Expect field types: "+fieldTypesToString() + " \n"
@@ -405,7 +405,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 				startPos = skipFields(bytes, startPos, limit, this.fieldDelim);
 				if (startPos < 0) {
 					if (!lenient) {
-						String lineAsString = new String(bytes, offset, numBytes);
+						String lineAsString = new String(bytes, offset, numBytes, getCharset());
 						throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
 								+ "Expect field types: "+fieldTypesToString()+" \n"
 								+ "in file: "+filePath);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5129f20..c7c8b1a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,9 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
@@ -1428,6 +1431,10 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the Flink installation root directory */
 	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
 
+	// ---------------------------- Encoding ------------------------------
+
+	public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
 	/**
 	 * Not instantiable.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index ddfd30a..abf65b1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -97,7 +98,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
 	}
 
 	public String toString() {
-		return new String(buffer, 0, count);
+		return new String(buffer, 0, count, ConfigConstants.DEFAULT_CHARSET);
 	}
 
 	private int getEndPosition() {

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 11e459a..4e1aa3e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.math.BigInteger;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link java.math.BigInteger}.
@@ -45,7 +46,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = new BigInteger(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return new BigInteger(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index f8b890a..908c05f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -19,6 +19,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 @PublicEvolving
 public class BooleanParser extends FieldParser<Boolean> {
@@ -27,12 +28,12 @@ public class BooleanParser extends FieldParser<Boolean> {
 
 	/** Values for true and false respectively. Must be lower case. */
 	private static final byte[][] TRUE = new byte[][] {
-			"true".getBytes(),
-			"1".getBytes()
+			"true".getBytes(ConfigConstants.DEFAULT_CHARSET),
+			"1".getBytes(ConfigConstants.DEFAULT_CHARSET)
 	};
 	private static final byte[][] FALSE = new byte[][] {
-			"false".getBytes(),
-			"0".getBytes()
+			"false".getBytes(ConfigConstants.DEFAULT_CHARSET),
+			"0".getBytes(ConfigConstants.DEFAULT_CHARSET)
 	};
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 2474adf..409cff2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a Double.
@@ -44,7 +45,7 @@ public class DoubleParser extends FieldParser<Double> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Double.parseDouble(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -101,7 +102,7 @@ public class DoubleParser extends FieldParser<Double> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Double.parseDouble(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 10b43c3..8f64691 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.types.DoubleValue;
 
 /**
@@ -43,7 +44,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			double value = Double.parseDouble(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index e76484e..5636a4e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link Float}.
@@ -42,7 +43,7 @@ public class FloatParser extends FieldParser<Float> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Float.parseFloat(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -99,7 +100,7 @@ public class FloatParser extends FieldParser<Float> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Float.parseFloat(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index a834f22..83fe63f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.types.FloatValue;
 
 /**
@@ -43,7 +44,7 @@ public class FloatValueParser extends FieldParser<FloatValue> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			float value = Float.parseFloat(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
index 859dcf8..24374b8 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.sql.Date;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link java.sql.Date}.
@@ -45,7 +46,7 @@ public class SqlDateParser extends FieldParser<Date> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Date.valueOf(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlDateParser extends FieldParser<Date> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Date.valueOf(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
index fbddadc..363cbb9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.sql.Time;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link Time}.
@@ -39,7 +40,7 @@ public class SqlTimeParser extends FieldParser<Time> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Time.valueOf(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -96,7 +97,7 @@ public class SqlTimeParser extends FieldParser<Time> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Time.valueOf(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
index 0bcb602..97443a5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.sql.Timestamp;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link Timestamp}.
@@ -45,7 +46,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Timestamp.valueOf(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Timestamp.valueOf(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 7ce0a2e..2ff5ee7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -73,11 +74,11 @@ public class DelimitedInputFormatTest {
 		cfg.setString("delimited-format.delimiter", "\n");
 		
 		format.configure(cfg);
-		assertEquals("\n", new String(format.getDelimiter()));
+		assertEquals("\n", new String(format.getDelimiter(), format.getCharset()));
 
 		cfg.setString("delimited-format.delimiter", "&-&");
 		format.configure(cfg);
-		assertEquals("&-&", new String(format.getDelimiter()));
+		assertEquals("&-&", new String(format.getDelimiter(), format.getCharset()));
 	}
 	
 	@Test
@@ -428,7 +429,7 @@ public class DelimitedInputFormatTest {
 		
 		@Override
 		public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) {
-			return new String(bytes, offset, numBytes);
+			return new String(bytes, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5599dd0..dfda372 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -269,7 +270,7 @@ public class FileInputFormatTest {
 			File luigiFile = temporaryFolder.newFile("_luigi");
 			File success = temporaryFolder.newFile("_SUCCESS");
 
-			createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
+			createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2, luigiFile, success);
 
 			// test that only the valid files are accepted
 			
@@ -308,7 +309,7 @@ public class FileInputFormatTest {
 
 			File[] files = { child1, child2 };
 
-			createTempFiles(contents.getBytes(), files);
+			createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), files);
 
 			// test that only the valid files are accepted
 
@@ -345,7 +346,7 @@ public class FileInputFormatTest {
 
 		File child1 = temporaryFolder.newFile("dataFile1.txt");
 		File child2 = temporaryFolder.newFile("another_file.bin");
-		createTempFiles(contents.getBytes(), child1, child2);
+		createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2);
 
 		// test that only the valid files are accepted
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 655ffd5..674f7e3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -113,7 +113,7 @@ public class ValueStateDescriptorTest {
 		}
 		data[199000] = '\0';
 
-		String defaultValue = new String(data);
+		String defaultValue = new String(data, ConfigConstants.DEFAULT_CHARSET);
 
 		ValueStateDescriptor<String> descr =
 				new ValueStateDescriptor<String>("testName", serializer, defaultValue);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index 7572408..d85ff95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -24,6 +24,7 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -35,6 +36,7 @@ import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 public class KryoClearedBufferTest {
@@ -262,7 +264,7 @@ public class KryoClearedBufferTest {
 
 		@Override
 		public void writeBytes(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
+			byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			checkSize(sBuffer.length);
 			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
 			position += sBuffer.length;
@@ -270,7 +272,7 @@ public class KryoClearedBufferTest {
 
 		@Override
 		public void writeChars(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
+			byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			checkSize(sBuffer.length);
 			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
 			position += sBuffer.length;
@@ -278,7 +280,7 @@ public class KryoClearedBufferTest {
 
 		@Override
 		public void writeUTF(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
+			byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			checkSize(sBuffer.length);
 			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
 			position += sBuffer.length;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 9b02147..51ace12 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -85,9 +86,9 @@ public abstract class ParserTestBase<T> extends TestLogger {
 				FieldParser<T> parser2 = getParser();
 				FieldParser<T> parser3 = getParser();
 				
-				byte[] bytes1 = testValues[i].getBytes();
-				byte[] bytes2 = testValues[i].getBytes();
-				byte[] bytes3 = testValues[i].getBytes();
+				byte[] bytes1 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+				byte[] bytes2 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+				byte[] bytes3 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 				int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
 				int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&'}, parser2.createValue());
@@ -132,8 +133,8 @@ public abstract class ParserTestBase<T> extends TestLogger {
 				String testVal1 = testValues[i] + "|";
 				String testVal2 = testValues[i] + "&&&&";
 
-				byte[] bytes1 = testVal1.getBytes();
-				byte[] bytes2 = testVal2.getBytes();
+				byte[] bytes1 = testVal1.getBytes(ConfigConstants.DEFAULT_CHARSET);
+				byte[] bytes2 = testVal2.getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 				int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
 				int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&','&', '&'}, parser2.createValue());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 				
 				FieldParser<T> parser = getParser();
 				
-				byte[] bytes = testValues[i].getBytes();
+				byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
 				
 				assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1);
@@ -318,7 +319,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 			
 			for (int i = 0; i < testValues.length; i++) {
 				
-				byte[] bytes = testValues[i].getBytes();
+				byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 				
 				
 				T result;
@@ -355,7 +356,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 			
 			for (int i = 0; i < testValues.length; i++) {
 				
-				byte[] bytes = testValues[i].getBytes();
+				byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 				
 				try {
 					parseMethod.invoke(null, bytes, 0, bytes.length, '|');
@@ -389,7 +390,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 		for (int i = 0; i < values.length; i++) {
 			String s = values[i];
 			
-			byte[] bytes = s.getBytes();
+			byte[] bytes = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			int numBytes = bytes.length;
 			System.arraycopy(bytes, 0, result, currPos, numBytes);
 			currPos += numBytes;
@@ -411,7 +412,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 			FieldParser<T> parser = getParser();
 
 			for (String emptyString : emptyStrings) {
-				byte[] bytes = emptyString.getBytes();
+				byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
 				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
 
 				assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 718274e..e6e6c62 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.types.parser;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
@@ -45,7 +46,7 @@ public class VarLengthStringParserTest {
 		this.parser = new StringValueParser();
 		
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "abcdefgh|i|jklmno|".getBytes();
+		byte[] recBytes = "abcdefgh|i|jklmno|".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 		
 		int startPos = 0;
@@ -63,14 +64,14 @@ public class VarLengthStringParserTest {
 		
 		
 		// check single field not terminated
-		recBytes = "abcde".getBytes();
+		recBytes = "abcde".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 5);
 		assertTrue(s.getValue().equals("abcde"));
 		
 		// check last field not terminated
-		recBytes = "abcde|fg".getBytes();
+		recBytes = "abcde|fg".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 6);
@@ -88,7 +89,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte)'"');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes();
+		byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 		
 		int startPos = 0;
@@ -106,14 +107,14 @@ public class VarLengthStringParserTest {
 		
 		
 		// check single field not terminated
-		recBytes = "\"abcde\"".getBytes();
+		recBytes = "\"abcde\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 7);
 		assertTrue(s.getValue().equals("abcde"));
 		
 		// check last field not terminated
-		recBytes = "\"abcde\"|\"fg\"".getBytes();
+		recBytes = "\"abcde\"|\"fg\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 8);
@@ -124,7 +125,7 @@ public class VarLengthStringParserTest {
 		assertTrue(s.getValue().equals("fg"));
 		
 		// check delimiter in quotes 
-		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes();
+		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 11);
@@ -135,7 +136,7 @@ public class VarLengthStringParserTest {
 		assertTrue(s.getValue().equals("hij|kl|mn|op"));
 		
 		// check delimiter in quotes last field not terminated
-		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes();
+		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 11);
@@ -153,7 +154,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte)'@');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 
 		int startPos = 0;
@@ -187,7 +188,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte)'"');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "\"abcdefgh\"-|\"jklmno  ".getBytes();
+		byte[] recBytes = "\"abcdefgh\"-|\"jklmno  ".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 
 		int startPos = 0;
@@ -207,7 +208,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte) '@');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 
 		int startPos = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 4a1556a..c6f46e3 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.test.socket;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.apache.flink.test.testdata.WordCountData;
@@ -62,7 +63,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
 						new String[] { "--port", String.valueOf(serverPort) });
 
 				if (errorMessages.size() != 0) {
-					fail("Found error message: " + new String(errorMessages.toByteArray()));
+					fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
 				}
 				
 				serverThread.join();
@@ -101,7 +102,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
 						new String[] { "--port", String.valueOf(serverPort) });
 
 				if (errorMessages.size() != 0) {
-					fail("Found error message: " + new String(errorMessages.toByteArray()));
+					fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
 				}
 				
 				serverThread.join();
@@ -154,4 +155,4 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
 		@Override
 		public void write(int b) {}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index df68a76..bc42838 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -294,7 +295,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
 		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx + ": " + sampleLine + " " + i + "\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 		return new Tuple2<>(tmp, str.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 440bfcc..e271a21 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -389,7 +390,7 @@ public class ContinuousFileProcessingMigrationTest {
 		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index f579345..19358e3 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -1042,7 +1043,7 @@ public class ContinuousFileProcessingTest {
 		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 75e666f..8a3f662 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -80,7 +81,7 @@ public class HDFSTest {
 			hdfs = hdPath.getFileSystem(hdConf);
 			FSDataOutputStream stream = hdfs.create(hdPath);
 			for(int i = 0; i < 10; i++) {
-				stream.write("Hello HDFS\n".getBytes());
+				stream.write("Hello HDFS\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
 			}
 			stream.close();
 
@@ -193,7 +194,7 @@ public class HDFSTest {
 
 		fs.mkdirs(directory);
 
-		byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes();
+		byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 		for (Path file: Arrays.asList(singleFile, directoryFile)) {
 			org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 05ed6fa..d454765 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -78,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 		if (parser.resetErrorStateAndParse(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) {
 			return parser.getLastResult();
 		} else {
-			String s = new String(bytes, offset, numBytes);
+			String s = new String(bytes, offset, numBytes, getCharset());
 			throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index ce37c74..b752966 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -155,7 +155,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
 				if (isLenient()) {
 					return false;
 				} else {
-					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index a303ff7..d047aa6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -79,7 +80,7 @@ public class CsvInputFormatTest {
 		tempFile.deleteOnExit();
 
 		try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
-			fileOutputStream.write(fileContent.getBytes());
+			fileOutputStream.write(fileContent.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 
 		// fix the number of blocks and the size of each one.
@@ -793,7 +794,8 @@ public class CsvInputFormatTest {
 		for (Object[] failure : failures) {
 			String input = (String) failure[0];
 
-			int result = stringParser.parseField(input.getBytes(), 0, input.length(), new byte[]{'|'}, null);
+			int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
+				input.length(), new byte[]{'|'}, null);
 
 			assertThat(result, is(-1));
 			assertThat(stringParser.getErrorState(), is(failure[1]));

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index f6bda30..943db36 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -701,7 +702,7 @@ public class RowCsvInputFormatTest {
 
 		for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) {
 			int result = stringParser.parseField(
-				failure.getKey().getBytes(),
+				failure.getKey().getBytes(ConfigConstants.DEFAULT_CHARSET),
 				0,
 				failure.getKey().length(),
 				new byte[]{(byte) '|'},

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
index d89fc41..3d79b08 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
@@ -13,6 +13,7 @@
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
 
 /*
 Utility function to deserialize strings, used for non-CSV sinks.
@@ -21,6 +22,6 @@ public class StringDeserializerMap implements MapFunction<byte[], String> {
 	@Override
 	public String map(byte[] value) throws Exception {
 		//discard type byte and size
-		return new String(value, 5, value.length - 5);
+		return new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
index b6d60e1..af5eac6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
@@ -14,6 +14,7 @@ package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
 
 /*
 Utility function to deserialize strings, used for CSV sinks.
@@ -22,6 +23,6 @@ public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<St
 	@Override
 	public Tuple1<String> map(byte[] value) throws Exception {
 		//5 = string type byte + string size
-		return new Tuple1<>(new String(value, 5, value.length - 5));
+		return new Tuple1<>(new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..c968bd6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -12,9 +12,19 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonPlanBinder;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -23,9 +33,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
+
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
@@ -33,11 +41,6 @@ import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAM
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
 import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
 import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This streamer is used by functions to send/receive data to/from an external python process.
@@ -127,12 +130,13 @@ public class PythonStreamer implements Serializable {
 		Runtime.getRuntime().addShutdownHook(shutdownThread);
 
 		OutputStream processOutput = process.getOutputStream();
-		processOutput.write("operator\n".getBytes());
-		processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
-		processOutput.write((id + "\n").getBytes());
-		processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes());
-		processOutput.write((inputFilePath + "\n").getBytes());
-		processOutput.write((outputFilePath + "\n").getBytes());
+		processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((id + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n")
+			.getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		processOutput.flush();
 
 		try { // wait a bit to catch syntax errors


[7/9] flink git commit: [FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor

Posted by se...@apache.org.
[FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor

This closes #3360


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527eabdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527eabdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527eabdd

Branch: refs/heads/master
Commit: 527eabdd4fb3e34b0698b53ec9a7fb1348882791
Parents: 8b49ee5
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Mon Feb 20 17:54:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/util/ExceptionUtils.java     | 13 +++++++++++++
 .../apache/flink/runtime/rpc/akka/AkkaRpcActor.java    |  6 ++++--
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7167a0b..ca81465 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -113,6 +113,19 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM
+	 * or an out-of-memory error. See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a
+	 * definition of fatal errors.
+	 *
+	 * @param t The Throwable to check and rethrow.
+	 */
+	public static void rethrowIfFatalErrorOrOOM(Throwable t) {
+		if (isJvmFatalError(t) || t instanceof OutOfMemoryError) {
+			throw (Error) t;
+		}
+	}
+
+	/**
 	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
 	 * to a prior exception, or returns the new exception, if no prior exception exists.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 264ba96..99f8211 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -271,8 +272,9 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 			// run immediately
 			try {
 				runAsync.getRunnable().run();
-			} catch (final Throwable e) {
-				LOG.error("Caught exception while executing runnable in main thread.", e);
+			} catch (Throwable t) {
+				LOG.error("Caught exception while executing runnable in main thread.", t);
+				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 			}
 		}
 		else {


[5/9] flink git commit: [FLINK-4545] [network] remove fixed-size BufferPool instances

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index e141cc2..424fc8b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -94,7 +95,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setInvokableClass(classOf[BlockingReceiver])
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
@@ -146,7 +148,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       receiver.setInvokableClass(classOf[BlockingReceiver])
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID


[6/9] flink git commit: [FLINK-4545] [network] remove fixed-size BufferPool instances

Posted by se...@apache.org.
[FLINK-4545] [network] remove fixed-size BufferPool instances

This closes #3467


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b49ee5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b49ee5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b49ee5a

Branch: refs/heads/master
Commit: 8b49ee5aa2e17b1787764c3265e1ebda47d89840
Parents: 3ab91cc
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 10 16:11:08 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  4 +-
 .../BackPressureStatsTrackerITCase.java         |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  4 +-
 .../serialization/SpanningRecordSerializer.java |  2 +-
 .../io/network/buffer/BufferPoolFactory.java    |  4 +-
 .../io/network/buffer/LocalBufferPool.java      | 36 +++++++++---
 .../io/network/buffer/NetworkBufferPool.java    | 24 +++-----
 .../netty/PartitionRequestServerHandler.java    |  2 +-
 .../network/partition/ResultPartitionType.java  | 16 +----
 .../runtime/jobgraph/IntermediateDataSet.java   |  8 ---
 .../flink/runtime/jobgraph/JobVertex.java       |  4 --
 .../ExecutionGraphConstructionTest.java         | 38 ++++++------
 .../ExecutionGraphDeploymentTest.java           |  6 +-
 .../ExecutionGraphSignalsTest.java              | 11 ++--
 .../ExecutionVertexLocalityTest.java            |  3 +-
 .../executiongraph/PointwisePatternTest.java    | 15 ++---
 .../executiongraph/VertexSlotSharingTest.java   |  5 +-
 .../io/network/MockNetworkEnvironment.java      |  2 +-
 .../io/network/api/writer/RecordWriterTest.java |  2 +-
 .../network/buffer/BufferPoolFactoryTest.java   | 31 +++-------
 .../network/buffer/NetworkBufferPoolTest.java   | 55 +++++-------------
 .../consumer/LocalInputChannelTest.java         |  4 +-
 .../flink/runtime/jobgraph/JobGraphTest.java    | 61 ++++++++++----------
 .../runtime/jobgraph/JobTaskVertexTest.java     |  5 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java    |  4 +-
 .../LeaderChangeJobRecoveryTest.java            |  3 +-
 .../LeaderChangeStateCleanupTest.java           |  3 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |  3 +-
 .../jobmanager/CoLocationConstraintITCase.scala | 11 ++--
 .../runtime/jobmanager/JobManagerITCase.scala   | 40 ++++++++-----
 .../runtime/jobmanager/RecoveryITCase.scala     | 18 +++---
 .../runtime/jobmanager/SlotSharingITCase.scala  | 19 +++---
 .../TaskManagerFailsWithSlotSharingITCase.scala | 12 ++--
 .../io/BarrierBufferMassiveRandomTest.java      |  4 +-
 .../runtime/NetworkStackThroughputITCase.java   | 10 +++-
 .../ZooKeeperLeaderElectionITCase.java          |  4 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  7 ++-
 37 files changed, 238 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index caeb43f..61e5327 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1289,7 +1289,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		syncConfig.setNumberOfIterations(maxNumIterations);
 		
 		// connect the sync task
-		sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+		sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 		
 		// ----------------------------- create the iteration tail ------------------------------
 		
@@ -1425,7 +1425,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			syncConfig.setNumberOfIterations(maxNumIterations);
 			
 			// connect the sync task
-			sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+			sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 		}
 		
 		// ----------------------------- create the iteration tails -----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 30a86a2..1943129 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -108,7 +108,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			//
 			// 1) Consume all buffers at first (no buffers for the test task)
 			//
-			testBufferPool = networkBufferPool.createBufferPool(1, false);
+			testBufferPool = networkBufferPool.createBufferPool(1);
 			final List<Buffer> buffers = new ArrayList<>();
 			while (true) {
 				Buffer buffer = testBufferPool.requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 5cf2c26..8e85ffe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -171,7 +171,7 @@ public class NetworkEnvironment {
 				BufferPool bufferPool = null;
 
 				try {
-					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
+					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions());
 					partition.registerBufferPool(bufferPool);
 
 					resultPartitionManager.registerResultPartition(partition);
@@ -198,7 +198,7 @@ public class NetworkEnvironment {
 				BufferPool bufferPool = null;
 
 				try {
-					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
+					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels());
 					gate.setBufferPool(bufferPool);
 				} catch (Throwable t) {
 					if (bufferPool != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index cb5665b..6c541a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -43,7 +43,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	/** Intermediate data serialization */
 	private final DataOutputSerializer serializationBuffer;
 
-	/** Intermediate buffer for data serialization */
+	/** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}) */
 	private ByteBuffer dataBuffer;
 
 	/** Intermediate buffer for length serialization */

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index 23321f4..e953158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -29,9 +29,9 @@ public interface BufferPoolFactory {
 	 * Tries to create a buffer pool, which is guaranteed to provide at least the number of required
 	 * buffers.
 	 *
-	 * <p> The buffer pool is either of dynamic size or fixed.
+	 * <p> The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
 	 */
-	BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException;
+	BufferPool createBufferPool(int numRequiredBuffers) throws IOException;
 
 	/**
 	 * Destroy callback for updating factory book keeping.

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 86e6870..d6a4cf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -45,30 +45,46 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 class LocalBufferPool implements BufferPool {
 
+	/** Global network buffer pool to get buffers from. */
 	private final NetworkBufferPool networkBufferPool;
 
-	// The minimum number of required segments for this pool
+	/** The minimum number of required segments for this pool */
 	private final int numberOfRequiredMemorySegments;
 
-	// The currently available memory segments. These are segments, which have been requested from
-	// the network buffer pool and are currently not handed out as Buffer instances.
+	/**
+	 * The currently available memory segments. These are segments, which have been requested from
+	 * the network buffer pool and are currently not handed out as Buffer instances.
+	 */
 	private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
 
-	// Buffer availability listeners, which need to be notified when a Buffer becomes available.
-	// Listeners can only be registered at a time/state where no Buffer instance was available.
+	/**
+	 * Buffer availability listeners, which need to be notified when a Buffer becomes available.
+	 * Listeners can only be registered at a time/state where no Buffer instance was available.
+	 */
 	private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
 
-	// The current size of this pool
+	/** The current size of this pool */
 	private int currentPoolSize;
 
-	// Number of all memory segments, which have been requested from the network buffer pool and are
-	// somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
+	/**
+	 * Number of all memory segments, which have been requested from the network buffer pool and are
+	 * somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
+	 */
 	private int numberOfRequestedMemorySegments;
 
 	private boolean isDestroyed;
 
 	private BufferPoolOwner owner;
 
+	/**
+	 * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
+	 * network buffers being available.
+	 *
+	 * @param networkBufferPool
+	 * 		global network buffer pool to get buffers from
+	 * @param numberOfRequiredMemorySegments
+	 * 		minimum number of network buffers
+	 */
 	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
 		this.networkBufferPool = networkBufferPool;
 		this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
@@ -265,11 +281,15 @@ class LocalBufferPool implements BufferPool {
 	// ------------------------------------------------------------------------
 
 	private void returnMemorySegment(MemorySegment segment) {
+		assert Thread.holdsLock(availableMemorySegments);
+
 		numberOfRequestedMemorySegments--;
 		networkBufferPool.recycle(segment);
 	}
 
 	private void returnExcessMemorySegments() {
+		assert Thread.holdsLock(availableMemorySegments);
+
 		while (numberOfRequestedMemorySegments > currentPoolSize) {
 			MemorySegment segment = availableMemorySegments.poll();
 			if (segment == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index dc23341..5345fbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -58,9 +58,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	private final Object factoryLock = new Object();
 
-	private final Set<LocalBufferPool> managedBufferPools = new HashSet<LocalBufferPool>();
-
-	public final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
+	private final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
 
 	private int numTotalRequiredBuffers;
 
@@ -182,7 +180,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException {
+	public BufferPool createBufferPool(int numRequiredBuffers) throws IOException {
 		// It is necessary to use a separate lock from the one used for buffer
 		// requests to ensure deadlock freedom for failure cases.
 		synchronized (factoryLock) {
@@ -209,12 +207,6 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			// non-fixed size buffers.
 			LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
 
-			// The fixed size pools get their share of buffers and don't change
-			// it during their lifetime.
-			if (!isFixedSize) {
-				managedBufferPools.add(localBufferPool);
-			}
-
 			allBufferPools.add(localBufferPool);
 
 			redistributeBuffers();
@@ -231,8 +223,6 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 		synchronized (factoryLock) {
 			if (allBufferPools.remove(bufferPool)) {
-				managedBufferPools.remove(bufferPool);
-
 				numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
 
 				try {
@@ -246,7 +236,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	/**
 	 * Destroys all buffer pools that allocate their buffers from this
-	 * buffer pool (created via {@link #createBufferPool(int, boolean)}).
+	 * buffer pool (created via {@link #createBufferPool(int)}).
 	 */
 	public void destroyAllBufferPools() {
 		synchronized (factoryLock) {
@@ -258,7 +248,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			}
 
 			// some sanity checks
-			if (allBufferPools.size() > 0 || managedBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+			if (allBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
 				throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools");
 			}
 		}
@@ -266,7 +256,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	// Must be called from synchronized block
 	private void redistributeBuffers() throws IOException {
-		int numManagedBufferPools = managedBufferPools.size();
+		assert Thread.holdsLock(factoryLock);
+
+		int numManagedBufferPools = allBufferPools.size();
 
 		if (numManagedBufferPools == 0) {
 			return; // necessary to avoid div by zero when no managed pools
@@ -283,7 +275,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 		int bufferPoolIndex = 0;
 
-		for (LocalBufferPool bufferPool : managedBufferPools) {
+		for (LocalBufferPool bufferPool : allBufferPools) {
 			int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
 
 			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 12b52ec..36c1234 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -67,7 +67,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
 		super.channelRegistered(ctx);
 
-		bufferPool = networkBufferPool.createBufferPool(1, false);
+		bufferPool = networkBufferPool.createBufferPool(1);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 65d49ed..43d3a52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -20,14 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
 
 public enum ResultPartitionType {
 
-	BLOCKING(true, false, false),
+	BLOCKING(false, false),
 
-	PIPELINED(false, true, true),
-
-	PIPELINED_PERSISTENT(true, true, true);
-
-	/** Does the partition live longer than the consuming task? */
-	private final boolean isPersistent;
+	PIPELINED(true, true);
 
 	/** Can the partition be consumed while being produced? */
 	private final boolean isPipelined;
@@ -38,8 +33,7 @@ public enum ResultPartitionType {
 	/**
 	 * Specifies the behaviour of an intermediate result partition at runtime.
 	 */
-	ResultPartitionType(boolean isPersistent, boolean isPipelined, boolean hasBackPressure) {
-		this.isPersistent = isPersistent;
+	ResultPartitionType(boolean isPipelined, boolean hasBackPressure) {
 		this.isPipelined = isPipelined;
 		this.hasBackPressure = hasBackPressure;
 	}
@@ -55,8 +49,4 @@ public enum ResultPartitionType {
 	public boolean isPipelined() {
 		return isPipelined;
 	}
-
-	public boolean isPersistent() {
-		return isPersistent;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index 2d9faa8..f02aaa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -47,14 +47,6 @@ public class IntermediateDataSet implements java.io.Serializable {
 	private final ResultPartitionType resultType;
 	
 	// --------------------------------------------------------------------------------------------
-	
-	public IntermediateDataSet(JobVertex producer) {
-		this(new IntermediateDataSetID(), producer);
-	}
-	
-	public IntermediateDataSet(IntermediateDataSetID id, JobVertex producer) {
-		this(id, ResultPartitionType.PIPELINED, producer);
-	}
 
 	public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {
 		this.id = checkNotNull(id);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 9dcaeeb..260bd74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -400,10 +400,6 @@ public class JobVertex implements java.io.Serializable {
 		return edge;
 	}
 
-	public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
-		return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
-	}
-
 	public JobEdge connectNewDataSetAsInput(
 			JobVertex input,
 			DistributionPattern distPattern,

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index fa48384..e1bad56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -104,11 +104,11 @@ public class ExecutionGraphConstructionTest {
 		v4.setInvokableClass(AbstractInvokable.class);
 		v5.setInvokableClass(AbstractInvokable.class);
 
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
-		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
-		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
-		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
-		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
@@ -153,7 +153,7 @@ public class ExecutionGraphConstructionTest {
 		v3.setInvokableClass(AbstractInvokable.class);
 
 		// this creates an intermediate result for v1
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		
 		// create results for v2 and v3
 		IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
@@ -193,7 +193,7 @@ public class ExecutionGraphConstructionTest {
 
 		v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL);
 		v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL);
-		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL);
 		
 		List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
@@ -230,7 +230,7 @@ public class ExecutionGraphConstructionTest {
 		v3.setInvokableClass(AbstractInvokable.class);
 		
 		// this creates an intermediate result for v1
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		
 		// create results for v2 and v3
 		IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
@@ -269,7 +269,7 @@ public class ExecutionGraphConstructionTest {
 
 		v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL);
 		v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL);
-		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL);
 		
 		List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
@@ -558,11 +558,11 @@ public class ExecutionGraphConstructionTest {
 		v4.setInvokableClass(AbstractInvokable.class);
 		v5.setInvokableClass(AbstractInvokable.class);
 
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
-		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
-		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
-		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
-		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
@@ -625,11 +625,11 @@ public class ExecutionGraphConstructionTest {
 			v4.setInvokableClass(AbstractInvokable.class);
 			v5.setInvokableClass(AbstractInvokable.class);
 			
-			v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
-			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
-			v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
-			v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
-			v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+			v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+			v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+			v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 			
 			v3.setInputSplitSource(source1);
 			v5.setInputSplitSource(source2);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 3d2913f..30824e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -90,9 +90,9 @@ public class ExecutionGraphDeploymentTest {
 			v3.setInvokableClass(BatchTask.class);
 			v4.setInvokableClass(BatchTask.class);
 
-			v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
-			v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
-			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+			v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 
 			ExecutionGraph eg = new ExecutionGraph(
 				TestingUtils.defaultExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 64b9aa2..27844c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -119,15 +120,15 @@ public class ExecutionGraphSignalsTest {
 		v4.setParallelism(dop[3]);
 		v5.setParallelism(dop[4]);
 
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		mockNumberOfInputs(1,0);
-		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		mockNumberOfInputs(3,1);
-		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		mockNumberOfInputs(3,2);
-		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		mockNumberOfInputs(4,3);
-		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		mockNumberOfInputs(4,2);
 
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index cfd4665..eb85a33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -200,7 +201,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		target.setInvokableClass(NoOpInvokable.class);
 
 		DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
-		target.connectNewDataSetAsInput(source, connectionPattern);
+		target.connectNewDataSetAsInput(source, connectionPattern, ResultPartitionType.PIPELINED);
 
 		JobGraph testJob = new JobGraph(jobId, "test job", source, target);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 5629c0b..8ff0032 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -62,7 +63,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
@@ -109,7 +110,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
@@ -157,7 +158,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
@@ -206,7 +207,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
@@ -253,7 +254,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
@@ -320,7 +321,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
@@ -378,7 +379,7 @@ public class PointwisePatternTest {
 		v1.setInvokableClass(AbstractInvokable.class);
 		v2.setInvokableClass(AbstractInvokable.class);
 	
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 	
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index bf17485..90e3368 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
@@ -67,8 +68,8 @@ public class VertexSlotSharingTest {
 			v4.setInvokableClass(AbstractInvokable.class);
 			v5.setInvokableClass(AbstractInvokable.class);
 
-			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
-			v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			
 			SlotSharingGroup jg1 = new SlotSharingGroup();
 			v2.setSlotSharingGroup(jg1);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
index 718ab45..dcdf44c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
@@ -36,7 +36,7 @@ public class MockNetworkEnvironment {
 
 	static {
 		try {
-			when(networkBufferPool.createBufferPool(anyInt(), anyBoolean())).thenReturn(mock(BufferPool.class));
+			when(networkBufferPool.createBufferPool(anyInt())).thenReturn(mock(BufferPool.class));
 			when(networkEnvironment.getNetworkBufferPool()).thenReturn(networkBufferPool);
 
 			when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 900b5c3..ca1d0a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -177,7 +177,7 @@ public class RecordWriterTest {
 
 		try {
 			buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
-			bufferPool = spy(buffers.createBufferPool(1, true));
+			bufferPool = spy(buffers.createBufferPool(1));
 
 			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
 			when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 0ac84dc..49808c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -52,37 +52,20 @@ public class BufferPoolFactoryTest {
 
 	@Test(expected = IOException.class)
 	public void testRequireMoreThanPossible() throws IOException {
-		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, false);
-	}
-
-	@Test
-	public void testFixedPool() throws IOException {
-		BufferPool lbp = networkBufferPool.createBufferPool(1, true);
-
-		assertEquals(1, lbp.getNumBuffers());
+		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2);
 	}
 
 	@Test
 	public void testSingleManagedPoolGetsAll() throws IOException {
-		BufferPool lbp = networkBufferPool.createBufferPool(1, false);
+		BufferPool lbp = networkBufferPool.createBufferPool(1);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), lbp.getNumBuffers());
 	}
 
 	@Test
-	public void testSingleManagedPoolGetsAllExceptFixedOnes() throws IOException {
-		BufferPool fixed = networkBufferPool.createBufferPool(24, true);
-
-		BufferPool lbp = networkBufferPool.createBufferPool(1, false);
-
-		assertEquals(24, fixed.getNumBuffers());
-		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - fixed.getNumBuffers(), lbp.getNumBuffers());
-	}
-
-	@Test
 	public void testUniformDistribution() throws IOException {
-		BufferPool first = networkBufferPool.createBufferPool(0, false);
-		BufferPool second = networkBufferPool.createBufferPool(0, false);
+		BufferPool first = networkBufferPool.createBufferPool(0);
+		BufferPool second = networkBufferPool.createBufferPool(0);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
@@ -97,7 +80,7 @@ public class BufferPoolFactoryTest {
 
 			int numPools = numBuffers / 32;
 			for (int i = 0; i < numPools; i++) {
-				pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1), random.nextBoolean()));
+				pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1)));
 			}
 
 			int numDistributedBuffers = 0;
@@ -115,11 +98,11 @@ public class BufferPoolFactoryTest {
 
 	@Test
 	public void testCreateDestroy() throws IOException {
-		BufferPool first = networkBufferPool.createBufferPool(0, false);
+		BufferPool first = networkBufferPool.createBufferPool(0);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
-		BufferPool second = networkBufferPool.createBufferPool(0, false);
+		BufferPool second = networkBufferPool.createBufferPool(0);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index fd5c7a5..ab32685 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -47,15 +47,7 @@ public class NetworkBufferPoolTest {
 			assertTrue(globalPool.isDestroyed());
 
 			try {
-				globalPool.createBufferPool(2, true);
-				fail("Should throw an IllegalStateException");
-			}
-			catch (IllegalStateException e) {
-				// yippie!
-			}
-
-			try {
-				globalPool.createBufferPool(2, false);
+				globalPool.createBufferPool(2);
 				fail("Should throw an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -71,26 +63,21 @@ public class NetworkBufferPoolTest {
 	@Test
 	public void testDestroyAll() {
 		try {
-			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+			NetworkBufferPool globalPool = new NetworkBufferPool(8, 128, MemoryType.HEAP);
 
-			BufferPool fixedPool = globalPool.createBufferPool(2, true);
-			BufferPool nonFixedPool = globalPool.createBufferPool(5, false);
+			BufferPool lbp = globalPool.createBufferPool(5);
 
-			assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
-			assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+			assertEquals(5, lbp.getNumberOfRequiredMemorySegments());
 
 			Buffer[] buffers = {
-					fixedPool.requestBuffer(),
-					fixedPool.requestBuffer(),
-
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer(),
-					nonFixedPool.requestBuffer()
+					lbp.requestBuffer(),
+					lbp.requestBuffer(),
+					lbp.requestBuffer(),
+					lbp.requestBuffer(),
+					lbp.requestBuffer(),
+					lbp.requestBuffer(),
+					lbp.requestBuffer(),
+					lbp.requestBuffer()
 			};
 
 			for (Buffer b : buffers) {
@@ -98,16 +85,14 @@ public class NetworkBufferPoolTest {
 				assertNotNull(b.getMemorySegment());
 			}
 
-			assertNull(fixedPool.requestBuffer());
-			assertNull(nonFixedPool.requestBuffer());
+			assertNull(lbp.requestBuffer());
 
 			// destroy all allocated ones
 			globalPool.destroyAllBufferPools();
 
 			// check the destroyed status
 			assertFalse(globalPool.isDestroyed());
-			assertTrue(fixedPool.isDestroyed());
-			assertTrue(nonFixedPool.isDestroyed());
+			assertTrue(lbp.isDestroyed());
 
 			assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
 
@@ -122,15 +107,7 @@ public class NetworkBufferPoolTest {
 
 			// can request no more buffers
 			try {
-				fixedPool.requestBuffer();
-				fail("Should fail with an IllegalStateException");
-			}
-			catch (IllegalStateException e) {
-				// that's the way we like it, aha, aha
-			}
-
-			try {
-				nonFixedPool.requestBuffer();
+				lbp.requestBuffer();
 				fail("Should fail with an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -138,7 +115,7 @@ public class NetworkBufferPoolTest {
 			}
 
 			// can create a new pool now
-			assertNotNull(globalPool.createBufferPool(10, false));
+			assertNotNull(globalPool.createBufferPool(8));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index e05fb56..15ff2da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -128,7 +128,7 @@ public class LocalInputChannelTest {
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(
-				networkBuffers.createBufferPool(producerBufferPoolSize, true));
+				networkBuffers.createBufferPool(producerBufferPoolSize));
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
@@ -162,7 +162,7 @@ public class LocalInputChannelTest {
 						i,
 						parallelism,
 						numberOfBuffersPerChannel,
-						networkBuffers.createBufferPool(parallelism, true),
+						networkBuffers.createBufferPool(parallelism),
 						partitionManager,
 						new TaskEventDispatcher(),
 						partitionIds)));

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 74f1adf..9f06c6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.junit.Test;
 
 public class JobGraphTest {
@@ -44,8 +45,8 @@ public class JobGraphTest {
 				JobVertex source1 = new JobVertex("source1");
 				JobVertex source2 = new JobVertex("source2");
 				JobVertex target = new JobVertex("target");
-				target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
-				target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL);
+				target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+				target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 				
 				jg.addVertex(source1);
 				jg.addVertex(source2);
@@ -84,11 +85,11 @@ public class JobGraphTest {
 			JobVertex intermediate1 = new JobVertex("intermediate1");
 			JobVertex intermediate2 = new JobVertex("intermediate2");
 			
-			target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
-			target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
-			target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE);
-			intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE);
-			intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			JobGraph graph = new JobGraph("TestGraph",
 				source1, source2, intermediate1, intermediate2, target1, target2);
@@ -121,19 +122,19 @@ public class JobGraphTest {
 			JobVertex l13 = new JobVertex("layer 1 - 3");
 			JobVertex l2 = new JobVertex("layer 2");
 			
-			root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE);
-			root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
-			root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE);
+			root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			
-			l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE);
-			l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE);
+			l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			
-			l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+			l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			
-			l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
-			l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			
-			l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+			l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			JobGraph graph = new JobGraph("TestGraph",
 				source1, source2, root, l11, l13, l12, l2);
@@ -177,10 +178,10 @@ public class JobGraphTest {
 			JobVertex op2 = new JobVertex("op2");
 			JobVertex op3 = new JobVertex("op3");
 			
-			op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
-			op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE);
-			op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
-			op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE);
+			op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
@@ -206,10 +207,10 @@ public class JobGraphTest {
 			JobVertex v3 = new JobVertex("3");
 			JobVertex v4 = new JobVertex("4");
 			
-			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
-			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
-			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
-			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4);
 			try {
@@ -236,12 +237,12 @@ public class JobGraphTest {
 			JobVertex v4 = new JobVertex("4");
 			JobVertex target = new JobVertex("target");
 			
-			v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
-			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
-			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
-			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
-			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
-			target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+			v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target);
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index 48f06b0..d94b93e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.Test;
 
@@ -41,7 +42,7 @@ public class JobTaskVertexTest {
 	public void testConnectDirectly() {
 		JobVertex source = new JobVertex("source");
 		JobVertex target = new JobVertex("target");
-		target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+		target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 		
 		assertTrue(source.isInputVertex());
 		assertFalse(source.isOutputVertex());
@@ -62,7 +63,7 @@ public class JobTaskVertexTest {
 		JobVertex source = new JobVertex("source");
 		JobVertex target1= new JobVertex("target1");
 		JobVertex target2 = new JobVertex("target2");
-		target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+		target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 		target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.ALL_TO_ALL);
 		
 		assertTrue(source.isInputVertex());

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index d1d5f03..62b9b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -64,8 +64,8 @@ public class JsonGeneratorTest {
 			join2.connectNewDataSetAsInput(join1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			join2.connectNewDataSetAsInput(source3, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
 			
-			sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE);
-			sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL);
+			sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 
 			JobGraph jg = new JobGraph("my job", source1, source2, source3,
 					intermediate1, intermediate2, join1, join2, sink1, sink2);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index fe33022..49d0239 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -129,7 +130,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		sender.setParallelism(parallelism);
 		receiver.setParallelism(parallelism);
 
-		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
 		sender.setSlotSharingGroup(slotSharingGroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 19cc444..7ae9974 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -272,7 +273,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		sender.setParallelism(parallelism);
 		receiver.setParallelism(parallelism);
 
-		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
 		sender.setSlotSharingGroup(slotSharingGroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 876e908..5a14b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -91,7 +92,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			JobVertex consumer = new JobVertex("AsyncConsumer");
 			consumer.setParallelism(1);
 			consumer.setInvokableClass(AsyncConsumer.class);
-			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE);
+			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
 			producer.setSlotSharingGroup(slot);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 12e2d63..d4b4cbf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -19,18 +19,18 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorSystem
-import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
 import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
@@ -60,7 +60,8 @@ class CoLocationConstraintITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
       sender.setSlotSharingGroup(sharingGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 31e72dd..5374d01 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobSnapshottingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
 import org.apache.flink.runtime.jobmanager.Tasks._
@@ -180,7 +181,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
@@ -215,7 +217,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
 
@@ -251,8 +254,10 @@ class JobManagerITCase(_system: ActorSystem)
       sender2.setParallelism(2 * num_tasks)
       receiver.setParallelism(3 * num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
@@ -296,8 +301,10 @@ class JobManagerITCase(_system: ActorSystem)
       sender2.setParallelism(2 * num_tasks)
       receiver.setParallelism(3 * num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
@@ -338,8 +345,10 @@ class JobManagerITCase(_system: ActorSystem)
       forwarder.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL)
-      receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL)
+      forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL,
+        ResultPartitionType.PIPELINED)
+      receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)
 
@@ -375,7 +384,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
@@ -423,7 +433,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
 
@@ -468,7 +479,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
@@ -508,7 +520,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 
@@ -556,7 +569,8 @@ class JobManagerITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index b96369f..f3ab409 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -18,22 +18,23 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{PoisonPill, ActorSystem}
+import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 import org.scalatest.junit.JUnitRunner
-import scala.concurrent.duration._
 
+import scala.concurrent.duration._
 import language.postfixOps
 
 @RunWith(classOf[JUnitRunner])
@@ -81,7 +82,8 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setParallelism(NUM_TASKS)
       receiver.setParallelism(NUM_TASKS)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val executionConfig = new ExecutionConfig()
       executionConfig.setNumberOfExecutionRetries(1);
@@ -125,7 +127,8 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setParallelism(NUM_TASKS)
       receiver.setParallelism(NUM_TASKS)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val sharingGroup = new SlotSharingGroup
       sender.setSlotSharingGroup(sharingGroup)
@@ -173,7 +176,8 @@ class RecoveryITCase(_system: ActorSystem)
       sender.setParallelism(NUM_TASKS)
       receiver.setParallelism(NUM_TASKS)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val sharingGroup = new SlotSharingGroup
       sender.setSlotSharingGroup(sharingGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index f986e73..4fffd68 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -19,18 +19,18 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorSystem
-import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
-import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
+import org.apache.flink.runtime.jobmanager.Tasks.{AgnosticBinaryReceiver, Receiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
 import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
@@ -60,7 +60,8 @@ class SlotSharingITCase(_system: ActorSystem)
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
 
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
       sender.setSlotSharingGroup(sharingGroup)
@@ -107,8 +108,10 @@ class SlotSharingITCase(_system: ActorSystem)
       sender2.setSlotSharingGroup(sharingGroup)
       receiver.setSlotSharingGroup(sharingGroup)
 
-      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
+      receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+        ResultPartitionType.PIPELINED)
 
       val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index d0136f0..9775d33 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{Kill, ActorSystem, PoisonPill}
+import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob}
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
 import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
@@ -61,7 +63,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
 
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val sharingGroup = new SlotSharingGroup()
       sender.setSlotSharingGroup(sharingGroup)
@@ -110,7 +113,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
 
       sender.setParallelism(num_tasks)
       receiver.setParallelism(num_tasks)
-      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+      receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+        ResultPartitionType.PIPELINED)
 
       val sharingGroup = new SlotSharingGroup()
       sender.setSlotSharingGroup(sharingGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6e088f6..ec55f19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -51,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
 		try {
 			ioMan = new IOManagerAsync();
 			
-			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
-			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
+			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
+			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 3d0e5ab..b38df61 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -129,11 +130,14 @@ public class NetworkStackThroughputITCase {
 			consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
 
 			if (useForwarder) {
-				forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL);
-				consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL);
+				forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
+					ResultPartitionType.PIPELINED);
+				consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL,
+					ResultPartitionType.PIPELINED);
 			}
 			else {
-				consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL);
+				consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
+					ResultPartitionType.PIPELINED);
 			}
 
 			return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index eacdeb4..df4f370 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -165,7 +166,8 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		sender.setParallelism(parallelism);
 		receiver.setParallelism(parallelism);
 
-		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+			ResultPartitionType.PIPELINED);
 
 		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
 		sender.setSlotSharingGroup(slotSharingGroup);


[4/9] flink git commit: [hotfix] [checkpoints] Cleanups in PendingCheckpoint

Posted by se...@apache.org.
[hotfix] [checkpoints] Cleanups in PendingCheckpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f0f7a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f0f7a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f0f7a7

Branch: refs/heads/master
Commit: 59f0f7a74b60a0d82ea28defb8a3c57f6cd85bea
Parents: 65ccf7c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 28 19:38:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/PendingCheckpoint.java   | 67 +++++++++++---------
 1 file changed, 38 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59f0f7a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6c9dbaf..5ca6040 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -18,19 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -51,6 +38,19 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
  * acknowledged by all tasks that need to acknowledge it. Once all tasks have
@@ -78,18 +78,17 @@ public class PendingCheckpoint {
 	/** Set of acknowledged tasks */
 	private final Set<ExecutionAttemptID> acknowledgedTasks;
 
-	/**
-	 * The checkpoint properties. If the checkpoint should be persisted
-	 * externally, it happens in {@link #finalizeCheckpointExternalized()}.
-	 */
+	/** The checkpoint properties. If the checkpoint should be persisted
+	 * externally, it happens in {@link #finalizeCheckpointExternalized()}. */
 	private final CheckpointProperties props;
 
 	/** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */
 	private final String targetDirectory;
 
 	/** The promise to fulfill once the checkpoint has been completed. */
-	private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>();
+	private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
+	/** The executor for potentially blocking I/O operations, like state disposal */
 	private final Executor executor;
 
 	private int numAcknowledgedTasks;
@@ -110,14 +109,6 @@ public class PendingCheckpoint {
 			CheckpointProperties props,
 			String targetDirectory,
 			Executor executor) {
-		this.jobId = checkNotNull(jobId);
-		this.checkpointId = checkpointId;
-		this.checkpointTimestamp = checkpointTimestamp;
-		this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
-		this.taskStates = new HashMap<>();
-		this.props = checkNotNull(props);
-		this.targetDirectory = targetDirectory;
-		this.executor = Preconditions.checkNotNull(executor);
 
 		// Sanity check
 		if (props.externalizeCheckpoint() && targetDirectory == null) {
@@ -127,7 +118,17 @@ public class PendingCheckpoint {
 		checkArgument(verticesToConfirm.size() > 0,
 				"Checkpoint needs at least one vertex that commits the checkpoint");
 
-		acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
+		this.jobId = checkNotNull(jobId);
+		this.checkpointId = checkpointId;
+		this.checkpointTimestamp = checkpointTimestamp;
+		this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+		this.props = checkNotNull(props);
+		this.targetDirectory = targetDirectory;
+		this.executor = Preconditions.checkNotNull(executor);
+
+		this.taskStates = new HashMap<>();
+		this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
+		this.onCompletionPromise = new FlinkCompletableFuture<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -193,7 +194,7 @@ public class PendingCheckpoint {
 	 * @param trackerCallback Callback for collecting subtask stats.
 	 */
 	void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
-		this.statsCallback = checkNotNull(trackerCallback);
+		this.statsCallback = trackerCallback;
 	}
 
 	// ------------------------------------------------------------------------
@@ -289,6 +290,8 @@ public class PendingCheckpoint {
 
 		onCompletionPromise.complete(completed);
 
+		// to prevent null-pointers from concurrent modification, copy reference onto stack
+		PendingCheckpointStats statsCallback = this.statsCallback;
 		if (statsCallback != null) {
 			// Finalize the statsCallback and give the completed checkpoint a
 			// callback for discards.
@@ -342,7 +345,8 @@ public class PendingCheckpoint {
 				TaskState taskState = taskStates.get(jobVertexID);
 
 				if (null == taskState) {
-					ChainedStateHandle<StreamStateHandle> nonPartitionedState =
+					@SuppressWarnings("deprecation")
+					ChainedStateHandle<StreamStateHandle> nonPartitionedState = 
 							subtaskState.getLegacyOperatorState();
 					ChainedStateHandle<OperatorStateHandle> partitioneableState =
 							subtaskState.getManagedOperatorState();
@@ -371,6 +375,9 @@ public class PendingCheckpoint {
 
 			++numAcknowledgedTasks;
 
+			// publish the checkpoint statistics
+			// to prevent null-pointers from concurrent modification, copy reference onto stack
+			final PendingCheckpointStats statsCallback = this.statsCallback;
 			if (statsCallback != null) {
 				// Do this in millis because the web frontend works with them
 				long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
@@ -493,6 +500,8 @@ public class PendingCheckpoint {
 	 * @param cause The failure cause or <code>null</code>.
 	 */
 	private void reportFailedCheckpoint(Exception cause) {
+		// to prevent null-pointers from concurrent modification, copy reference onto stack
+		final PendingCheckpointStats statsCallback = this.statsCallback;
 		if (statsCallback != null) {
 			long failureTimestamp = System.currentTimeMillis();
 			statsCallback.reportFailedCheckpoint(failureTimestamp, cause);