You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:27 UTC

[2/9] flink git commit: [FLINK-5824] Fix String/byte conversions without explicit encoding

[FLINK-5824] Fix String/byte conversions without explicit encoding

This closes #3468


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53fedbd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53fedbd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53fedbd2

Branch: refs/heads/master
Commit: 53fedbd2894c6c7b839d8fdcc0dbf1e6e21e631a
Parents: 84afd06
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Fri Mar 3 13:24:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |   9 +-
 .../kafka/internals/ZookeeperOffsetHandler.java |   5 +-
 .../testutils/ZooKeeperStringSerializer.java    |   9 +-
 .../manualtests/ManualConsumerProducerTest.java |   3 +-
 ...nualExactlyOnceWithStreamReshardingTest.java |   4 +-
 .../kinesis/manualtests/ManualProducerTest.java |   3 +-
 .../testutils/FakeKinesisBehavioursFactory.java |   5 +-
 .../nifi/examples/NiFiSinkTopologyExample.java  |   4 +-
 .../connectors/rabbitmq/RMQSourceTest.java      |   7 +-
 .../typeutils/runtime/StringArrayWritable.java  |   5 +-
 .../hbase/example/HBaseFlinkTestConstants.java  |   6 +-
 .../addons/hbase/example/HBaseWriteExample.java |   5 +-
 .../state/RocksDBKeyedStateBackend.java         |   8 +-
 .../state/RocksDBMergeIteratorTest.java         |   3 +-
 .../flink/storm/wrappers/StormTupleTest.java    |   5 +-
 .../io/SimpleTweetInputFormat.java              |   2 +-
 .../api/common/io/GenericCsvInputFormat.java    |   6 +-
 .../flink/configuration/ConfigConstants.java    |   7 ++
 .../memory/ByteArrayOutputStreamWithPos.java    |   3 +-
 .../apache/flink/types/parser/BigIntParser.java |   5 +-
 .../flink/types/parser/BooleanParser.java       |   9 +-
 .../apache/flink/types/parser/DoubleParser.java |   5 +-
 .../flink/types/parser/DoubleValueParser.java   |   3 +-
 .../apache/flink/types/parser/FloatParser.java  |   5 +-
 .../flink/types/parser/FloatValueParser.java    |   3 +-
 .../flink/types/parser/SqlDateParser.java       |   5 +-
 .../flink/types/parser/SqlTimeParser.java       |   5 +-
 .../flink/types/parser/SqlTimestampParser.java  |   5 +-
 .../api/common/io/DelimitedInputFormatTest.java |   7 +-
 .../api/common/io/FileInputFormatTest.java      |   7 +-
 .../common/state/ValueStateDescriptorTest.java  |   2 +-
 .../runtime/kryo/KryoClearedBufferTest.java     |   8 +-
 .../flink/types/parser/ParserTestBase.java      |  21 ++--
 .../types/parser/VarLengthStringParserTest.java |  23 ++--
 .../socket/SocketWindowWordCountITCase.java     |   7 +-
 .../ContinuousFileProcessingITCase.java         |   3 +-
 .../ContinuousFileProcessingMigrationTest.java  |   3 +-
 .../hdfstests/ContinuousFileProcessingTest.java |   3 +-
 .../org/apache/flink/hdfstests/HDFSTest.java    |   5 +-
 .../flink/api/java/io/PrimitiveInputFormat.java |   2 +-
 .../flink/api/java/io/RowCsvInputFormat.java    |   2 +-
 .../flink/api/java/io/CsvInputFormatTest.java   |   6 +-
 .../api/java/io/RowCsvInputFormatTest.java      |   3 +-
 .../functions/util/StringDeserializerMap.java   |   3 +-
 .../util/StringTupleDeserializerMap.java        |   3 +-
 .../api/streaming/data/PythonStreamer.java      |  34 +++---
 .../api/streaming/plan/PythonPlanReceiver.java  |   4 +-
 .../api/streaming/plan/PythonPlanStreamer.java  |   9 +-
 .../api/streaming/util/SerializationUtils.java  |   8 +-
 .../api/streaming/util/StreamPrinter.java       |   4 +-
 .../store/ZooKeeperMesosWorkerStore.java        |   7 +-
 .../flink/metrics/statsd/StatsDReporter.java    |   3 +-
 .../metrics/statsd/StatsDReporterTest.java      |   2 +-
 .../runtime/webmonitor/HttpRequestHandler.java  |   4 +-
 .../webmonitor/PipelineErrorHandler.java        |   6 +-
 .../webmonitor/RuntimeMonitorHandler.java       |   3 +-
 .../handlers/ConstantTextHandler.java           |  10 +-
 .../handlers/HandlerRedirectUtils.java          |   7 +-
 .../JobCancellationWithSavepointHandlers.java   |   2 +-
 .../handlers/TaskManagerLogHandler.java         |   3 +-
 .../handlers/TaskManagerLogHandlerTest.java     |   3 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   3 +-
 .../checkpoint/savepoint/SavepointV1Test.java   |  14 ++-
 .../serialization/types/AsciiStringType.java    |   9 +-
 .../runtime/operators/DataSinkTaskTest.java     |   9 +-
 .../runtime/operators/DataSourceTaskTest.java   |   7 +-
 .../FsCheckpointStateOutputStreamTest.java      |   3 +-
 .../util/serialization/SimpleStringSchema.java  |   5 +-
 .../functions/sink/SocketClientSinkTest.java    |   5 +-
 ...ontinuousFileProcessingCheckpointITCase.java |   3 +-
 .../test/misc/CheckForbiddenMethodsUsage.java   | 115 +++++++++++++++++++
 71 files changed, 359 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 600d1e5..1db45a5 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -25,13 +25,14 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import java.io.IOException;
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
 
 	/**
@@ -154,7 +155,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 		}
 
 		if(userDefinedSchema != null) {
-			byte[] json = userDefinedSchema.toString().getBytes();
+			byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
 			out.writeInt(json.length);
 			out.write(json);
 		} else {
@@ -175,7 +176,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 			byte[] json = new byte[length];
 			in.readFully(json);
 
-			Schema schema = new Schema.Parser().parse(new String(json));
+			Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
 			setSchema(schema);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index cec980f..c02c2cb 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.slf4j.Logger;
@@ -119,7 +120,7 @@ public class ZookeeperOffsetHandler {
 		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
 		String path = topicDirs.consumerOffsetDir() + "/" + partition;
 		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-		byte[] data = Long.toString(offset).getBytes();
+		byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
 		curatorClient.setData().forPath(path, data);
 	}
 
@@ -133,7 +134,7 @@ public class ZookeeperOffsetHandler {
 		if (data == null) {
 			return null;
 		} else {
-			String asString = new String(data);
+			String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
 			if (asString.length() == 0) {
 				return null;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
index 8a4c408..37ed408 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -19,20 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Simple ZooKeeper serializer for Strings.
  */
 public class ZooKeeperStringSerializer implements ZkSerializer {
 
-	private static final Charset CHARSET = Charset.forName("UTF-8");
-	
 	@Override
 	public byte[] serialize(Object data) {
 		if (data instanceof String) {
-			return ((String) data).getBytes(CHARSET);
+			return ((String) data).getBytes(ConfigConstants.DEFAULT_CHARSET);
 		}
 		else {
 			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
@@ -45,7 +42,7 @@ public class ZooKeeperStringSerializer implements ZkSerializer {
 			return null;
 		}
 		else {
-			return new String(bytes, CHARSET);
+			return new String(bytes, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 6e02a55..63c6c2b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
@@ -62,7 +63,7 @@ public class ManualConsumerProducerTest {
 				new KinesisSerializationSchema<String>() {
 					@Override
 					public ByteBuffer serialize(String element) {
-						return ByteBuffer.wrap(element.getBytes());
+						return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
 					}
 
 					// every 10th element goes into a different stream

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 6abea2a..71bcae3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
 import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
@@ -119,7 +119,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 								}
 								batch.add(
 									new PutRecordsRequestEntry()
-										.withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes()))
+										.withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes(ConfigConstants.DEFAULT_CHARSET)))
 										.withPartitionKey(UUID.randomUUID().toString()));
 							}
 							count += batchSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 35e9ef6..1df717c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -17,6 +17,7 @@
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
@@ -59,7 +60,7 @@ public class ManualProducerTest {
 				new KinesisSerializationSchema<String>() {
 					@Override
 					public ByteBuffer serialize(String element) {
-						return ByteBuffer.wrap(element.getBytes());
+						return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
 					}
 
 					// every 10th element goes into a different stream

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 964ee76..b62e7de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -21,14 +21,15 @@ import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -196,7 +197,7 @@ public class FakeKinesisBehavioursFactory {
 			for (int i = min; i < max; i++) {
 				batch.add(
 					new Record()
-						.withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+						.withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET)))
 						.withPartitionKey(UUID.randomUUID().toString())
 						.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
 						.withSequenceNumber(String.valueOf(i)));

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
index 572f949..202e80a 100644
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -17,6 +17,7 @@
 package org.apache.flink.streaming.connectors.nifi.examples;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
@@ -45,7 +46,8 @@ public class NiFiSinkTopologyExample {
 				.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
 					@Override
 					public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
-						return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+						return new StandardNiFiDataPacket(s.getBytes(ConfigConstants.DEFAULT_CHARSET),
+							new HashMap<String,String>());
 					}
 				}));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 26434ed..b65ddf0 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -45,7 +47,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.modules.junit4.PowerMockRunner;
-import com.rabbitmq.client.Connection;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -323,7 +324,7 @@ public class RMQSourceTest {
 			} catch (InterruptedException e) {
 				e.printStackTrace();
 			}
-			return new String(message);
+			return new String(message, ConfigConstants.DEFAULT_CHARSET);
 		}
 
 		@Override
@@ -365,7 +366,7 @@ public class RMQSourceTest {
 
 			// Mock for delivery
 			final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
-			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes(ConfigConstants.DEFAULT_CHARSET));
 
 			try {
 				Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
index c32f5da..8c3a8cd 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.hadoop.io.Writable;
 
 import java.io.DataInput;
@@ -41,7 +42,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
 		out.writeInt(this.array.length);
 		
 		for(String str : this.array) {
-			byte[] b = str.getBytes();
+			byte[] b = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			out.writeInt(b.length);
 			out.write(b);
 		}
@@ -54,7 +55,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
 		for(int i = 0; i < this.array.length; i++) {
 			byte[] b = new byte[in.readInt()];
 			in.readFully(b);
-			this.array[i] = new String(b);
+			this.array[i] = new String(b, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index 8579dee..f56295e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.addons.hbase.example;
 
+import org.apache.flink.configuration.ConfigConstants;
+
 public class HBaseFlinkTestConstants {
 	
-	public static final byte[] CF_SOME = "someCf".getBytes();
-	public static final byte[] Q_SOME = "someQual".getBytes();
+	public static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
+	public static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
 	public static final String TEST_TABLE_NAME = "test-table";
 	public static final String TMP_DIR = "/tmp/test";
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 483bdff..64d20c3 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -87,7 +88,7 @@ public class HBaseWriteExample {
 			@Override
 			public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
 				reuse.f0 = new Text(t.f0);
-				Put put = new Put(t.f0.getBytes());
+				Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
 				put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
 				reuse.f1 = put;
 				return reuse;
@@ -199,4 +200,4 @@ public class HBaseWriteExample {
 		"The fair Ophelia!--Nymph, in thy orisons",
 		"Be all my sins remember'd."
 	};
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bd8d4dd..eb926c0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
@@ -171,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
 		// RocksDB seems to need this...
-		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
 		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
 		try {
 
@@ -727,7 +728,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				if (null == columnFamily) {
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-							metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+						metaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+						rocksDBKeyedStateBackend.columnOptions);
 
 					RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
 							new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
@@ -824,7 +826,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
-				descriptor.getName().getBytes(), columnOptions);
+				descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions);
 
 		try {
 			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 1cb3b2b..956ef2f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Assert;
@@ -71,7 +72,7 @@ public class RocksDBMergeIteratorTest {
 
 			for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
 				ColumnFamilyHandle handle = rocksDB.createColumnFamily(
-						new ColumnFamilyDescriptor(("column-" + c).getBytes()));
+						new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
 
 				ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos();
 				DataOutputStream dos = new DataOutputStream(bos);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
index 7ea4b76..5e6c160 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.storm.wrappers;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.MessageId;
@@ -188,7 +189,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testString() {
 		final byte[] data = new byte[this.r.nextInt(15)];
 		this.r.nextBytes(data);
-		final String flinkTuple = new String(data);
+		final String flinkTuple = new String(data, ConfigConstants.DEFAULT_CHARSET);
 
 		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null, -1, null, null,
 				null);
@@ -304,7 +305,7 @@ public class StormTupleTest extends AbstractTest {
 	public void testStringTuple() {
 		final byte[] rawdata = new byte[this.r.nextInt(15)];
 		this.r.nextBytes(rawdata);
-		final String data = new String(rawdata);
+		final String data = new String(rawdata, ConfigConstants.DEFAULT_CHARSET);
 
 		final int index = this.r.nextInt(5);
 		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
index a72fc14..f7f1bde 100644
--- a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
@@ -90,4 +90,4 @@ public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implemen
 	public TypeInformation<Tweet> getProducedType() {
 		return new GenericTypeInfo<Tweet>(Tweet.class);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index b934d41..bddaec9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -362,7 +362,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 				if (lenient) {
 					return false;
 				} else {
-					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
 				}
 			}
 
@@ -380,7 +380,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 					if (lenient) {
 						return false;
 					} else {
-						String lineAsString = new String(bytes, offset, numBytes);
+						String lineAsString = new String(bytes, offset, numBytes, getCharset());
 						throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
 								+ "ParserError " + parser.getErrorState() + " \n"
 								+ "Expect field types: "+fieldTypesToString() + " \n"
@@ -405,7 +405,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 				startPos = skipFields(bytes, startPos, limit, this.fieldDelim);
 				if (startPos < 0) {
 					if (!lenient) {
-						String lineAsString = new String(bytes, offset, numBytes);
+						String lineAsString = new String(bytes, offset, numBytes, getCharset());
 						throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
 								+ "Expect field types: "+fieldTypesToString()+" \n"
 								+ "in file: "+filePath);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5129f20..c7c8b1a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,9 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
@@ -1428,6 +1431,10 @@ public final class ConfigConstants {
 	/** The environment variable name which contains the Flink installation root directory */
 	public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
 
+	// ---------------------------- Encoding ------------------------------
+
+	public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
 	/**
 	 * Not instantiable.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index ddfd30a..abf65b1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -97,7 +98,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
 	}
 
 	public String toString() {
-		return new String(buffer, 0, count);
+		return new String(buffer, 0, count, ConfigConstants.DEFAULT_CHARSET);
 	}
 
 	private int getEndPosition() {

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 11e459a..4e1aa3e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.math.BigInteger;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link java.math.BigInteger}.
@@ -45,7 +46,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = new BigInteger(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return new BigInteger(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index f8b890a..908c05f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -19,6 +19,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 @PublicEvolving
 public class BooleanParser extends FieldParser<Boolean> {
@@ -27,12 +28,12 @@ public class BooleanParser extends FieldParser<Boolean> {
 
 	/** Values for true and false respectively. Must be lower case. */
 	private static final byte[][] TRUE = new byte[][] {
-			"true".getBytes(),
-			"1".getBytes()
+			"true".getBytes(ConfigConstants.DEFAULT_CHARSET),
+			"1".getBytes(ConfigConstants.DEFAULT_CHARSET)
 	};
 	private static final byte[][] FALSE = new byte[][] {
-			"false".getBytes(),
-			"0".getBytes()
+			"false".getBytes(ConfigConstants.DEFAULT_CHARSET),
+			"0".getBytes(ConfigConstants.DEFAULT_CHARSET)
 	};
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 2474adf..409cff2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a Double.
@@ -44,7 +45,7 @@ public class DoubleParser extends FieldParser<Double> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Double.parseDouble(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -101,7 +102,7 @@ public class DoubleParser extends FieldParser<Double> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Double.parseDouble(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 10b43c3..8f64691 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.types.DoubleValue;
 
 /**
@@ -43,7 +44,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			double value = Double.parseDouble(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index e76484e..5636a4e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link Float}.
@@ -42,7 +43,7 @@ public class FloatParser extends FieldParser<Float> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Float.parseFloat(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -99,7 +100,7 @@ public class FloatParser extends FieldParser<Float> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Float.parseFloat(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index a834f22..83fe63f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -20,6 +20,7 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.types.FloatValue;
 
 /**
@@ -43,7 +44,7 @@ public class FloatValueParser extends FieldParser<FloatValue> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			float value = Float.parseFloat(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
index 859dcf8..24374b8 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.sql.Date;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link java.sql.Date}.
@@ -45,7 +46,7 @@ public class SqlDateParser extends FieldParser<Date> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Date.valueOf(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlDateParser extends FieldParser<Date> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Date.valueOf(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
index fbddadc..363cbb9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.sql.Time;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link Time}.
@@ -39,7 +40,7 @@ public class SqlTimeParser extends FieldParser<Time> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Time.valueOf(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -96,7 +97,7 @@ public class SqlTimeParser extends FieldParser<Time> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Time.valueOf(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
index 0bcb602..97443a5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
 
 import java.sql.Timestamp;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
 
 /**
  * Parses a text field into a {@link Timestamp}.
@@ -45,7 +46,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, endPos - startPos);
+		String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
 		try {
 			this.result = Timestamp.valueOf(str);
 			return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		final String str = new String(bytes, startPos, limitedLen);
+		final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
 		return Timestamp.valueOf(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 7ce0a2e..2ff5ee7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -73,11 +74,11 @@ public class DelimitedInputFormatTest {
 		cfg.setString("delimited-format.delimiter", "\n");
 		
 		format.configure(cfg);
-		assertEquals("\n", new String(format.getDelimiter()));
+		assertEquals("\n", new String(format.getDelimiter(), format.getCharset()));
 
 		cfg.setString("delimited-format.delimiter", "&-&");
 		format.configure(cfg);
-		assertEquals("&-&", new String(format.getDelimiter()));
+		assertEquals("&-&", new String(format.getDelimiter(), format.getCharset()));
 	}
 	
 	@Test
@@ -428,7 +429,7 @@ public class DelimitedInputFormatTest {
 		
 		@Override
 		public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) {
-			return new String(bytes, offset, numBytes);
+			return new String(bytes, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5599dd0..dfda372 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -269,7 +270,7 @@ public class FileInputFormatTest {
 			File luigiFile = temporaryFolder.newFile("_luigi");
 			File success = temporaryFolder.newFile("_SUCCESS");
 
-			createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
+			createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2, luigiFile, success);
 
 			// test that only the valid files are accepted
 			
@@ -308,7 +309,7 @@ public class FileInputFormatTest {
 
 			File[] files = { child1, child2 };
 
-			createTempFiles(contents.getBytes(), files);
+			createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), files);
 
 			// test that only the valid files are accepted
 
@@ -345,7 +346,7 @@ public class FileInputFormatTest {
 
 		File child1 = temporaryFolder.newFile("dataFile1.txt");
 		File child2 = temporaryFolder.newFile("another_file.bin");
-		createTempFiles(contents.getBytes(), child1, child2);
+		createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2);
 
 		// test that only the valid files are accepted
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 655ffd5..674f7e3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -113,7 +113,7 @@ public class ValueStateDescriptorTest {
 		}
 		data[199000] = '\0';
 
-		String defaultValue = new String(data);
+		String defaultValue = new String(data, ConfigConstants.DEFAULT_CHARSET);
 
 		ValueStateDescriptor<String> descr =
 				new ValueStateDescriptor<String>("testName", serializer, defaultValue);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index 7572408..d85ff95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -24,6 +24,7 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -35,6 +36,7 @@ import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 public class KryoClearedBufferTest {
@@ -262,7 +264,7 @@ public class KryoClearedBufferTest {
 
 		@Override
 		public void writeBytes(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
+			byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			checkSize(sBuffer.length);
 			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
 			position += sBuffer.length;
@@ -270,7 +272,7 @@ public class KryoClearedBufferTest {
 
 		@Override
 		public void writeChars(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
+			byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			checkSize(sBuffer.length);
 			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
 			position += sBuffer.length;
@@ -278,7 +280,7 @@ public class KryoClearedBufferTest {
 
 		@Override
 		public void writeUTF(String s) throws IOException {
-			byte[] sBuffer = s.getBytes();
+			byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			checkSize(sBuffer.length);
 			System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
 			position += sBuffer.length;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 9b02147..51ace12 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -85,9 +86,9 @@ public abstract class ParserTestBase<T> extends TestLogger {
 				FieldParser<T> parser2 = getParser();
 				FieldParser<T> parser3 = getParser();
 				
-				byte[] bytes1 = testValues[i].getBytes();
-				byte[] bytes2 = testValues[i].getBytes();
-				byte[] bytes3 = testValues[i].getBytes();
+				byte[] bytes1 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+				byte[] bytes2 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+				byte[] bytes3 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 				int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
 				int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&'}, parser2.createValue());
@@ -132,8 +133,8 @@ public abstract class ParserTestBase<T> extends TestLogger {
 				String testVal1 = testValues[i] + "|";
 				String testVal2 = testValues[i] + "&&&&";
 
-				byte[] bytes1 = testVal1.getBytes();
-				byte[] bytes2 = testVal2.getBytes();
+				byte[] bytes1 = testVal1.getBytes(ConfigConstants.DEFAULT_CHARSET);
+				byte[] bytes2 = testVal2.getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 				int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
 				int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&','&', '&'}, parser2.createValue());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 				
 				FieldParser<T> parser = getParser();
 				
-				byte[] bytes = testValues[i].getBytes();
+				byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
 				
 				assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1);
@@ -318,7 +319,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 			
 			for (int i = 0; i < testValues.length; i++) {
 				
-				byte[] bytes = testValues[i].getBytes();
+				byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 				
 				
 				T result;
@@ -355,7 +356,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 			
 			for (int i = 0; i < testValues.length; i++) {
 				
-				byte[] bytes = testValues[i].getBytes();
+				byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
 				
 				try {
 					parseMethod.invoke(null, bytes, 0, bytes.length, '|');
@@ -389,7 +390,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 		for (int i = 0; i < values.length; i++) {
 			String s = values[i];
 			
-			byte[] bytes = s.getBytes();
+			byte[] bytes = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
 			int numBytes = bytes.length;
 			System.arraycopy(bytes, 0, result, currPos, numBytes);
 			currPos += numBytes;
@@ -411,7 +412,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
 			FieldParser<T> parser = getParser();
 
 			for (String emptyString : emptyStrings) {
-				byte[] bytes = emptyString.getBytes();
+				byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
 				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
 
 				assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 718274e..e6e6c62 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.types.parser;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
@@ -45,7 +46,7 @@ public class VarLengthStringParserTest {
 		this.parser = new StringValueParser();
 		
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "abcdefgh|i|jklmno|".getBytes();
+		byte[] recBytes = "abcdefgh|i|jklmno|".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 		
 		int startPos = 0;
@@ -63,14 +64,14 @@ public class VarLengthStringParserTest {
 		
 		
 		// check single field not terminated
-		recBytes = "abcde".getBytes();
+		recBytes = "abcde".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 5);
 		assertTrue(s.getValue().equals("abcde"));
 		
 		// check last field not terminated
-		recBytes = "abcde|fg".getBytes();
+		recBytes = "abcde|fg".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 6);
@@ -88,7 +89,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte)'"');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes();
+		byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 		
 		int startPos = 0;
@@ -106,14 +107,14 @@ public class VarLengthStringParserTest {
 		
 		
 		// check single field not terminated
-		recBytes = "\"abcde\"".getBytes();
+		recBytes = "\"abcde\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 7);
 		assertTrue(s.getValue().equals("abcde"));
 		
 		// check last field not terminated
-		recBytes = "\"abcde\"|\"fg\"".getBytes();
+		recBytes = "\"abcde\"|\"fg\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 8);
@@ -124,7 +125,7 @@ public class VarLengthStringParserTest {
 		assertTrue(s.getValue().equals("fg"));
 		
 		// check delimiter in quotes 
-		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes();
+		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 11);
@@ -135,7 +136,7 @@ public class VarLengthStringParserTest {
 		assertTrue(s.getValue().equals("hij|kl|mn|op"));
 		
 		// check delimiter in quotes last field not terminated
-		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes();
+		recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		startPos = 0;
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos == 11);
@@ -153,7 +154,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte)'@');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 
 		int startPos = 0;
@@ -187,7 +188,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte)'"');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "\"abcdefgh\"-|\"jklmno  ".getBytes();
+		byte[] recBytes = "\"abcdefgh\"-|\"jklmno  ".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 
 		int startPos = 0;
@@ -207,7 +208,7 @@ public class VarLengthStringParserTest {
 		this.parser.enableQuotedStringParsing((byte) '@');
 
 		// check valid strings with out whitespaces and trailing delimiter
-		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
 		StringValue s = new StringValue();
 
 		int startPos = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 4a1556a..c6f46e3 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.test.socket;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.apache.flink.test.testdata.WordCountData;
@@ -62,7 +63,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
 						new String[] { "--port", String.valueOf(serverPort) });
 
 				if (errorMessages.size() != 0) {
-					fail("Found error message: " + new String(errorMessages.toByteArray()));
+					fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
 				}
 				
 				serverThread.join();
@@ -101,7 +102,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
 						new String[] { "--port", String.valueOf(serverPort) });
 
 				if (errorMessages.size() != 0) {
-					fail("Found error message: " + new String(errorMessages.toByteArray()));
+					fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
 				}
 				
 				serverThread.join();
@@ -154,4 +155,4 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
 		@Override
 		public void write(int b) {}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index df68a76..bc42838 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -294,7 +295,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
 		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx + ": " + sampleLine + " " + i + "\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 		return new Tuple2<>(tmp, str.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 440bfcc..e271a21 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -389,7 +390,7 @@ public class ContinuousFileProcessingMigrationTest {
 		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index f579345..19358e3 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -1042,7 +1043,7 @@ public class ContinuousFileProcessingTest {
 		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
 			str.append(line);
-			stream.write(line.getBytes());
+			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 		stream.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 75e666f..8a3f662 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -80,7 +81,7 @@ public class HDFSTest {
 			hdfs = hdPath.getFileSystem(hdConf);
 			FSDataOutputStream stream = hdfs.create(hdPath);
 			for(int i = 0; i < 10; i++) {
-				stream.write("Hello HDFS\n".getBytes());
+				stream.write("Hello HDFS\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
 			}
 			stream.close();
 
@@ -193,7 +194,7 @@ public class HDFSTest {
 
 		fs.mkdirs(directory);
 
-		byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes();
+		byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 		for (Path file: Arrays.asList(singleFile, directoryFile)) {
 			org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 05ed6fa..d454765 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -78,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 		if (parser.resetErrorStateAndParse(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) {
 			return parser.getLastResult();
 		} else {
-			String s = new String(bytes, offset, numBytes);
+			String s = new String(bytes, offset, numBytes, getCharset());
 			throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index ce37c74..b752966 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -155,7 +155,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
 				if (isLenient()) {
 					return false;
 				} else {
-					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index a303ff7..d047aa6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -79,7 +80,7 @@ public class CsvInputFormatTest {
 		tempFile.deleteOnExit();
 
 		try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
-			fileOutputStream.write(fileContent.getBytes());
+			fileOutputStream.write(fileContent.getBytes(ConfigConstants.DEFAULT_CHARSET));
 		}
 
 		// fix the number of blocks and the size of each one.
@@ -793,7 +794,8 @@ public class CsvInputFormatTest {
 		for (Object[] failure : failures) {
 			String input = (String) failure[0];
 
-			int result = stringParser.parseField(input.getBytes(), 0, input.length(), new byte[]{'|'}, null);
+			int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
+				input.length(), new byte[]{'|'}, null);
 
 			assertThat(result, is(-1));
 			assertThat(stringParser.getErrorState(), is(failure[1]));

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index f6bda30..943db36 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -701,7 +702,7 @@ public class RowCsvInputFormatTest {
 
 		for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) {
 			int result = stringParser.parseField(
-				failure.getKey().getBytes(),
+				failure.getKey().getBytes(ConfigConstants.DEFAULT_CHARSET),
 				0,
 				failure.getKey().length(),
 				new byte[]{(byte) '|'},

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
index d89fc41..3d79b08 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
@@ -13,6 +13,7 @@
 package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
 
 /*
 Utility function to deserialize strings, used for non-CSV sinks.
@@ -21,6 +22,6 @@ public class StringDeserializerMap implements MapFunction<byte[], String> {
 	@Override
 	public String map(byte[] value) throws Exception {
 		//discard type byte and size
-		return new String(value, 5, value.length - 5);
+		return new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
index b6d60e1..af5eac6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
@@ -14,6 +14,7 @@ package org.apache.flink.python.api.functions.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
 
 /*
 Utility function to deserialize strings, used for CSV sinks.
@@ -22,6 +23,6 @@ public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<St
 	@Override
 	public Tuple1<String> map(byte[] value) throws Exception {
 		//5 = string type byte + string size
-		return new Tuple1<>(new String(value, 5, value.length - 5));
+		return new Tuple1<>(new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..c968bd6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -12,9 +12,19 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonPlanBinder;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -23,9 +33,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
+
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
@@ -33,11 +41,6 @@ import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAM
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
 import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
 import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This streamer is used by functions to send/receive data to/from an external python process.
@@ -127,12 +130,13 @@ public class PythonStreamer implements Serializable {
 		Runtime.getRuntime().addShutdownHook(shutdownThread);
 
 		OutputStream processOutput = process.getOutputStream();
-		processOutput.write("operator\n".getBytes());
-		processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
-		processOutput.write((id + "\n").getBytes());
-		processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes());
-		processOutput.write((inputFilePath + "\n").getBytes());
-		processOutput.write((outputFilePath + "\n").getBytes());
+		processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((id + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n")
+			.getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+		processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		processOutput.flush();
 
 		try { // wait a bit to catch syntax errors