You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:27 UTC
[2/9] flink git commit: [FLINK-5824] Fix String/byte conversions
without explicit encoding
[FLINK-5824] Fix String/byte conversions without explicit encoding
This closes #3468
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53fedbd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53fedbd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53fedbd2
Branch: refs/heads/master
Commit: 53fedbd2894c6c7b839d8fdcc0dbf1e6e21e631a
Parents: 84afd06
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Fri Mar 3 13:24:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100
----------------------------------------------------------------------
.../flink/api/java/io/AvroOutputFormat.java | 9 +-
.../kafka/internals/ZookeeperOffsetHandler.java | 5 +-
.../testutils/ZooKeeperStringSerializer.java | 9 +-
.../manualtests/ManualConsumerProducerTest.java | 3 +-
...nualExactlyOnceWithStreamReshardingTest.java | 4 +-
.../kinesis/manualtests/ManualProducerTest.java | 3 +-
.../testutils/FakeKinesisBehavioursFactory.java | 5 +-
.../nifi/examples/NiFiSinkTopologyExample.java | 4 +-
.../connectors/rabbitmq/RMQSourceTest.java | 7 +-
.../typeutils/runtime/StringArrayWritable.java | 5 +-
.../hbase/example/HBaseFlinkTestConstants.java | 6 +-
.../addons/hbase/example/HBaseWriteExample.java | 5 +-
.../state/RocksDBKeyedStateBackend.java | 8 +-
.../state/RocksDBMergeIteratorTest.java | 3 +-
.../flink/storm/wrappers/StormTupleTest.java | 5 +-
.../io/SimpleTweetInputFormat.java | 2 +-
.../api/common/io/GenericCsvInputFormat.java | 6 +-
.../flink/configuration/ConfigConstants.java | 7 ++
.../memory/ByteArrayOutputStreamWithPos.java | 3 +-
.../apache/flink/types/parser/BigIntParser.java | 5 +-
.../flink/types/parser/BooleanParser.java | 9 +-
.../apache/flink/types/parser/DoubleParser.java | 5 +-
.../flink/types/parser/DoubleValueParser.java | 3 +-
.../apache/flink/types/parser/FloatParser.java | 5 +-
.../flink/types/parser/FloatValueParser.java | 3 +-
.../flink/types/parser/SqlDateParser.java | 5 +-
.../flink/types/parser/SqlTimeParser.java | 5 +-
.../flink/types/parser/SqlTimestampParser.java | 5 +-
.../api/common/io/DelimitedInputFormatTest.java | 7 +-
.../api/common/io/FileInputFormatTest.java | 7 +-
.../common/state/ValueStateDescriptorTest.java | 2 +-
.../runtime/kryo/KryoClearedBufferTest.java | 8 +-
.../flink/types/parser/ParserTestBase.java | 21 ++--
.../types/parser/VarLengthStringParserTest.java | 23 ++--
.../socket/SocketWindowWordCountITCase.java | 7 +-
.../ContinuousFileProcessingITCase.java | 3 +-
.../ContinuousFileProcessingMigrationTest.java | 3 +-
.../hdfstests/ContinuousFileProcessingTest.java | 3 +-
.../org/apache/flink/hdfstests/HDFSTest.java | 5 +-
.../flink/api/java/io/PrimitiveInputFormat.java | 2 +-
.../flink/api/java/io/RowCsvInputFormat.java | 2 +-
.../flink/api/java/io/CsvInputFormatTest.java | 6 +-
.../api/java/io/RowCsvInputFormatTest.java | 3 +-
.../functions/util/StringDeserializerMap.java | 3 +-
.../util/StringTupleDeserializerMap.java | 3 +-
.../api/streaming/data/PythonStreamer.java | 34 +++---
.../api/streaming/plan/PythonPlanReceiver.java | 4 +-
.../api/streaming/plan/PythonPlanStreamer.java | 9 +-
.../api/streaming/util/SerializationUtils.java | 8 +-
.../api/streaming/util/StreamPrinter.java | 4 +-
.../store/ZooKeeperMesosWorkerStore.java | 7 +-
.../flink/metrics/statsd/StatsDReporter.java | 3 +-
.../metrics/statsd/StatsDReporterTest.java | 2 +-
.../runtime/webmonitor/HttpRequestHandler.java | 4 +-
.../webmonitor/PipelineErrorHandler.java | 6 +-
.../webmonitor/RuntimeMonitorHandler.java | 3 +-
.../handlers/ConstantTextHandler.java | 10 +-
.../handlers/HandlerRedirectUtils.java | 7 +-
.../JobCancellationWithSavepointHandlers.java | 2 +-
.../handlers/TaskManagerLogHandler.java | 3 +-
.../handlers/TaskManagerLogHandlerTest.java | 3 +-
.../apache/flink/runtime/blob/BlobUtils.java | 3 +-
.../checkpoint/savepoint/SavepointV1Test.java | 14 ++-
.../serialization/types/AsciiStringType.java | 9 +-
.../runtime/operators/DataSinkTaskTest.java | 9 +-
.../runtime/operators/DataSourceTaskTest.java | 7 +-
.../FsCheckpointStateOutputStreamTest.java | 3 +-
.../util/serialization/SimpleStringSchema.java | 5 +-
.../functions/sink/SocketClientSinkTest.java | 5 +-
...ontinuousFileProcessingCheckpointITCase.java | 3 +-
.../test/misc/CheckForbiddenMethodsUsage.java | 115 +++++++++++++++++++
71 files changed, 359 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 600d1e5..1db45a5 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -25,13 +25,14 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
import java.io.IOException;
import java.io.Serializable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
/**
@@ -154,7 +155,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
}
if(userDefinedSchema != null) {
- byte[] json = userDefinedSchema.toString().getBytes();
+ byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
out.writeInt(json.length);
out.write(json);
} else {
@@ -175,7 +176,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
byte[] json = new byte[length];
in.readFully(json);
- Schema schema = new Schema.Parser().parse(new String(json));
+ Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
setSchema(schema);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index cec980f..c02c2cb 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
@@ -119,7 +120,7 @@ public class ZookeeperOffsetHandler {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
- byte[] data = Long.toString(offset).getBytes();
+ byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
@@ -133,7 +134,7 @@ public class ZookeeperOffsetHandler {
if (data == null) {
return null;
} else {
- String asString = new String(data);
+ String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
index 8a4c408..37ed408 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -19,20 +19,17 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Simple ZooKeeper serializer for Strings.
*/
public class ZooKeeperStringSerializer implements ZkSerializer {
- private static final Charset CHARSET = Charset.forName("UTF-8");
-
@Override
public byte[] serialize(Object data) {
if (data instanceof String) {
- return ((String) data).getBytes(CHARSET);
+ return ((String) data).getBytes(ConfigConstants.DEFAULT_CHARSET);
}
else {
throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
@@ -45,7 +42,7 @@ public class ZooKeeperStringSerializer implements ZkSerializer {
return null;
}
else {
- return new String(bytes, CHARSET);
+ return new String(bytes, ConfigConstants.DEFAULT_CHARSET);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 6e02a55..63c6c2b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
@@ -62,7 +63,7 @@ public class ManualConsumerProducerTest {
new KinesisSerializationSchema<String>() {
@Override
public ByteBuffer serialize(String element) {
- return ByteBuffer.wrap(element.getBytes());
+ return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// every 10th element goes into a different stream
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 6abea2a..71bcae3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
@@ -119,7 +119,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
}
batch.add(
new PutRecordsRequestEntry()
- .withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes()))
+ .withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString()));
}
count += batchSize;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 35e9ef6..1df717c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
@@ -59,7 +60,7 @@ public class ManualProducerTest {
new KinesisSerializationSchema<String>() {
@Override
public ByteBuffer serialize(String element) {
- return ByteBuffer.wrap(element.getBytes());
+ return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// every 10th element goes into a different stream
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 964ee76..b62e7de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -21,14 +21,15 @@ import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -196,7 +197,7 @@ public class FakeKinesisBehavioursFactory {
for (int i = min; i < max; i++) {
batch.add(
new Record()
- .withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+ .withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(i)));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
index 572f949..202e80a 100644
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.nifi.examples;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
@@ -45,7 +46,8 @@ public class NiFiSinkTopologyExample {
.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
@Override
public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
- return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+ return new StandardNiFiDataPacket(s.getBytes(ConfigConstants.DEFAULT_CHARSET),
+ new HashMap<String,String>());
}
}));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 26434ed..b65ddf0 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -45,7 +47,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.modules.junit4.PowerMockRunner;
-import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -323,7 +324,7 @@ public class RMQSourceTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
- return new String(message);
+ return new String(message, ConfigConstants.DEFAULT_CHARSET);
}
@Override
@@ -365,7 +366,7 @@ public class RMQSourceTest {
// Mock for delivery
final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
- Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+ Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes(ConfigConstants.DEFAULT_CHARSET));
try {
Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
index c32f5da..8c3a8cd 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils.runtime;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
@@ -41,7 +42,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
out.writeInt(this.array.length);
for(String str : this.array) {
- byte[] b = str.getBytes();
+ byte[] b = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
out.writeInt(b.length);
out.write(b);
}
@@ -54,7 +55,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
for(int i = 0; i < this.array.length; i++) {
byte[] b = new byte[in.readInt()];
in.readFully(b);
- this.array[i] = new String(b);
+ this.array[i] = new String(b, ConfigConstants.DEFAULT_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index 8579dee..f56295e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -18,10 +18,12 @@
package org.apache.flink.addons.hbase.example;
+import org.apache.flink.configuration.ConfigConstants;
+
public class HBaseFlinkTestConstants {
- public static final byte[] CF_SOME = "someCf".getBytes();
- public static final byte[] Q_SOME = "someQual".getBytes();
+ public static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
+ public static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
public static final String TEST_TABLE_NAME = "test-table";
public static final String TMP_DIR = "/tmp/test";
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 483bdff..64d20c3 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Mutation;
@@ -87,7 +88,7 @@ public class HBaseWriteExample {
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
- Put put = new Put(t.f0.getBytes());
+ Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
@@ -199,4 +200,4 @@ public class HBaseWriteExample {
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bd8d4dd..eb926c0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
@@ -171,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
// RocksDB seems to need this...
- columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+ columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
try {
@@ -727,7 +728,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (null == columnFamily) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
- metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+ metaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ rocksDBKeyedStateBackend.columnOptions);
RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
@@ -824,7 +826,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
- descriptor.getName().getBytes(), columnOptions);
+ descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions);
try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 1cb3b2b..956ef2f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
@@ -71,7 +72,7 @@ public class RocksDBMergeIteratorTest {
for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
ColumnFamilyHandle handle = rocksDB.createColumnFamily(
- new ColumnFamilyDescriptor(("column-" + c).getBytes()));
+ new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos();
DataOutputStream dos = new DataOutputStream(bos);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
index 7ea4b76..5e6c160 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.storm.wrappers;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.MessageId;
@@ -188,7 +189,7 @@ public class StormTupleTest extends AbstractTest {
public void testString() {
final byte[] data = new byte[this.r.nextInt(15)];
this.r.nextBytes(data);
- final String flinkTuple = new String(data);
+ final String flinkTuple = new String(data, ConfigConstants.DEFAULT_CHARSET);
final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null, -1, null, null,
null);
@@ -304,7 +305,7 @@ public class StormTupleTest extends AbstractTest {
public void testStringTuple() {
final byte[] rawdata = new byte[this.r.nextInt(15)];
this.r.nextBytes(rawdata);
- final String data = new String(rawdata);
+ final String data = new String(rawdata, ConfigConstants.DEFAULT_CHARSET);
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
index a72fc14..f7f1bde 100644
--- a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
@@ -90,4 +90,4 @@ public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implemen
public TypeInformation<Tweet> getProducedType() {
return new GenericTypeInfo<Tweet>(Tweet.class);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index b934d41..bddaec9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -362,7 +362,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
if (lenient) {
return false;
} else {
- throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
}
}
@@ -380,7 +380,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
if (lenient) {
return false;
} else {
- String lineAsString = new String(bytes, offset, numBytes);
+ String lineAsString = new String(bytes, offset, numBytes, getCharset());
throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
+ "ParserError " + parser.getErrorState() + " \n"
+ "Expect field types: "+fieldTypesToString() + " \n"
@@ -405,7 +405,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
startPos = skipFields(bytes, startPos, limit, this.fieldDelim);
if (startPos < 0) {
if (!lenient) {
- String lineAsString = new String(bytes, offset, numBytes);
+ String lineAsString = new String(bytes, offset, numBytes, getCharset());
throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
+ "Expect field types: "+fieldTypesToString()+" \n"
+ "in file: "+filePath);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5129f20..c7c8b1a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,9 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
import static org.apache.flink.configuration.ConfigOptions.key;
/**
@@ -1428,6 +1431,10 @@ public final class ConfigConstants {
/** The environment variable name which contains the Flink installation root directory */
public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
+ // ---------------------------- Encoding ------------------------------
+
+ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
/**
* Not instantiable.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index ddfd30a..abf65b1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -19,6 +19,7 @@
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -97,7 +98,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
}
public String toString() {
- return new String(buffer, 0, count);
+ return new String(buffer, 0, count, ConfigConstants.DEFAULT_CHARSET);
}
private int getEndPosition() {
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 11e459a..4e1aa3e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.math.BigInteger;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link java.math.BigInteger}.
@@ -45,7 +46,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = new BigInteger(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return new BigInteger(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index f8b890a..908c05f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -19,6 +19,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
@PublicEvolving
public class BooleanParser extends FieldParser<Boolean> {
@@ -27,12 +28,12 @@ public class BooleanParser extends FieldParser<Boolean> {
/** Values for true and false respectively. Must be lower case. */
private static final byte[][] TRUE = new byte[][] {
- "true".getBytes(),
- "1".getBytes()
+ "true".getBytes(ConfigConstants.DEFAULT_CHARSET),
+ "1".getBytes(ConfigConstants.DEFAULT_CHARSET)
};
private static final byte[][] FALSE = new byte[][] {
- "false".getBytes(),
- "0".getBytes()
+ "false".getBytes(ConfigConstants.DEFAULT_CHARSET),
+ "0".getBytes(ConfigConstants.DEFAULT_CHARSET)
};
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 2474adf..409cff2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a Double.
@@ -44,7 +45,7 @@ public class DoubleParser extends FieldParser<Double> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Double.parseDouble(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -101,7 +102,7 @@ public class DoubleParser extends FieldParser<Double> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Double.parseDouble(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 10b43c3..8f64691 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.types.DoubleValue;
/**
@@ -43,7 +44,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
double value = Double.parseDouble(str);
reusable.setValue(value);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index e76484e..5636a4e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link Float}.
@@ -42,7 +43,7 @@ public class FloatParser extends FieldParser<Float> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Float.parseFloat(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -99,7 +100,7 @@ public class FloatParser extends FieldParser<Float> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Float.parseFloat(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index a834f22..83fe63f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.types.FloatValue;
/**
@@ -43,7 +44,7 @@ public class FloatValueParser extends FieldParser<FloatValue> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
float value = Float.parseFloat(str);
reusable.setValue(value);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
index 859dcf8..24374b8 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.sql.Date;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link java.sql.Date}.
@@ -45,7 +46,7 @@ public class SqlDateParser extends FieldParser<Date> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Date.valueOf(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlDateParser extends FieldParser<Date> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Date.valueOf(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
index fbddadc..363cbb9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.sql.Time;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link Time}.
@@ -39,7 +40,7 @@ public class SqlTimeParser extends FieldParser<Time> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Time.valueOf(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -96,7 +97,7 @@ public class SqlTimeParser extends FieldParser<Time> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Time.valueOf(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
index 0bcb602..97443a5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.sql.Timestamp;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link Timestamp}.
@@ -45,7 +46,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Timestamp.valueOf(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Timestamp.valueOf(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 7ce0a2e..2ff5ee7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -73,11 +74,11 @@ public class DelimitedInputFormatTest {
cfg.setString("delimited-format.delimiter", "\n");
format.configure(cfg);
- assertEquals("\n", new String(format.getDelimiter()));
+ assertEquals("\n", new String(format.getDelimiter(), format.getCharset()));
cfg.setString("delimited-format.delimiter", "&-&");
format.configure(cfg);
- assertEquals("&-&", new String(format.getDelimiter()));
+ assertEquals("&-&", new String(format.getDelimiter(), format.getCharset()));
}
@Test
@@ -428,7 +429,7 @@ public class DelimitedInputFormatTest {
@Override
public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) {
- return new String(bytes, offset, numBytes);
+ return new String(bytes, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5599dd0..dfda372 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
@@ -269,7 +270,7 @@ public class FileInputFormatTest {
File luigiFile = temporaryFolder.newFile("_luigi");
File success = temporaryFolder.newFile("_SUCCESS");
- createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
+ createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2, luigiFile, success);
// test that only the valid files are accepted
@@ -308,7 +309,7 @@ public class FileInputFormatTest {
File[] files = { child1, child2 };
- createTempFiles(contents.getBytes(), files);
+ createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), files);
// test that only the valid files are accepted
@@ -345,7 +346,7 @@ public class FileInputFormatTest {
File child1 = temporaryFolder.newFile("dataFile1.txt");
File child2 = temporaryFolder.newFile("another_file.bin");
- createTempFiles(contents.getBytes(), child1, child2);
+ createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2);
// test that only the valid files are accepted
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 655ffd5..674f7e3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -113,7 +113,7 @@ public class ValueStateDescriptorTest {
}
data[199000] = '\0';
- String defaultValue = new String(data);
+ String defaultValue = new String(data, ConfigConstants.DEFAULT_CHARSET);
ValueStateDescriptor<String> descr =
new ValueStateDescriptor<String>("testName", serializer, defaultValue);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index 7572408..d85ff95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -24,6 +24,7 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
@@ -35,6 +36,7 @@ import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class KryoClearedBufferTest {
@@ -262,7 +264,7 @@ public class KryoClearedBufferTest {
@Override
public void writeBytes(String s) throws IOException {
- byte[] sBuffer = s.getBytes();
+ byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
checkSize(sBuffer.length);
System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
position += sBuffer.length;
@@ -270,7 +272,7 @@ public class KryoClearedBufferTest {
@Override
public void writeChars(String s) throws IOException {
- byte[] sBuffer = s.getBytes();
+ byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
checkSize(sBuffer.length);
System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
position += sBuffer.length;
@@ -278,7 +280,7 @@ public class KryoClearedBufferTest {
@Override
public void writeUTF(String s) throws IOException {
- byte[] sBuffer = s.getBytes();
+ byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
checkSize(sBuffer.length);
System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
position += sBuffer.length;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 9b02147..51ace12 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -85,9 +86,9 @@ public abstract class ParserTestBase<T> extends TestLogger {
FieldParser<T> parser2 = getParser();
FieldParser<T> parser3 = getParser();
- byte[] bytes1 = testValues[i].getBytes();
- byte[] bytes2 = testValues[i].getBytes();
- byte[] bytes3 = testValues[i].getBytes();
+ byte[] bytes1 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+ byte[] bytes2 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+ byte[] bytes3 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&'}, parser2.createValue());
@@ -132,8 +133,8 @@ public abstract class ParserTestBase<T> extends TestLogger {
String testVal1 = testValues[i] + "|";
String testVal2 = testValues[i] + "&&&&";
- byte[] bytes1 = testVal1.getBytes();
- byte[] bytes2 = testVal2.getBytes();
+ byte[] bytes1 = testVal1.getBytes(ConfigConstants.DEFAULT_CHARSET);
+ byte[] bytes2 = testVal2.getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&','&', '&'}, parser2.createValue());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
FieldParser<T> parser = getParser();
- byte[] bytes = testValues[i].getBytes();
+ byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1);
@@ -318,7 +319,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
for (int i = 0; i < testValues.length; i++) {
- byte[] bytes = testValues[i].getBytes();
+ byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
T result;
@@ -355,7 +356,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
for (int i = 0; i < testValues.length; i++) {
- byte[] bytes = testValues[i].getBytes();
+ byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
try {
parseMethod.invoke(null, bytes, 0, bytes.length, '|');
@@ -389,7 +390,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
for (int i = 0; i < values.length; i++) {
String s = values[i];
- byte[] bytes = s.getBytes();
+ byte[] bytes = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
int numBytes = bytes.length;
System.arraycopy(bytes, 0, result, currPos, numBytes);
currPos += numBytes;
@@ -411,7 +412,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
FieldParser<T> parser = getParser();
for (String emptyString : emptyStrings) {
- byte[] bytes = emptyString.getBytes();
+ byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 718274e..e6e6c62 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.types.parser;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Test;
@@ -45,7 +46,7 @@ public class VarLengthStringParserTest {
this.parser = new StringValueParser();
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "abcdefgh|i|jklmno|".getBytes();
+ byte[] recBytes = "abcdefgh|i|jklmno|".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -63,14 +64,14 @@ public class VarLengthStringParserTest {
// check single field not terminated
- recBytes = "abcde".getBytes();
+ recBytes = "abcde".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 5);
assertTrue(s.getValue().equals("abcde"));
// check last field not terminated
- recBytes = "abcde|fg".getBytes();
+ recBytes = "abcde|fg".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 6);
@@ -88,7 +89,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte)'"');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes();
+ byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -106,14 +107,14 @@ public class VarLengthStringParserTest {
// check single field not terminated
- recBytes = "\"abcde\"".getBytes();
+ recBytes = "\"abcde\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 7);
assertTrue(s.getValue().equals("abcde"));
// check last field not terminated
- recBytes = "\"abcde\"|\"fg\"".getBytes();
+ recBytes = "\"abcde\"|\"fg\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 8);
@@ -124,7 +125,7 @@ public class VarLengthStringParserTest {
assertTrue(s.getValue().equals("fg"));
// check delimiter in quotes
- recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes();
+ recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 11);
@@ -135,7 +136,7 @@ public class VarLengthStringParserTest {
assertTrue(s.getValue().equals("hij|kl|mn|op"));
// check delimiter in quotes last field not terminated
- recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes();
+ recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 11);
@@ -153,7 +154,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte)'@');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+ byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -187,7 +188,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte)'"');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "\"abcdefgh\"-|\"jklmno ".getBytes();
+ byte[] recBytes = "\"abcdefgh\"-|\"jklmno ".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -207,7 +208,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte) '@');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+ byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 4a1556a..c6f46e3 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.test.socket;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.testdata.WordCountData;
@@ -62,7 +63,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
new String[] { "--port", String.valueOf(serverPort) });
if (errorMessages.size() != 0) {
- fail("Found error message: " + new String(errorMessages.toByteArray()));
+ fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
}
serverThread.join();
@@ -101,7 +102,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
new String[] { "--port", String.valueOf(serverPort) });
if (errorMessages.size() != 0) {
- fail("Found error message: " + new String(errorMessages.toByteArray()));
+ fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
}
serverThread.join();
@@ -154,4 +155,4 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
@Override
public void write(int b) {}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index df68a76..bc42838 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -294,7 +295,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
return new Tuple2<>(tmp, str.toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 440bfcc..e271a21 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -389,7 +390,7 @@ public class ContinuousFileProcessingMigrationTest {
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx +": "+ sampleLine + " " + i +"\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index f579345..19358e3 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -1042,7 +1043,7 @@ public class ContinuousFileProcessingTest {
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx +": "+ sampleLine + " " + i +"\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 75e666f..8a3f662 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
@@ -80,7 +81,7 @@ public class HDFSTest {
hdfs = hdPath.getFileSystem(hdConf);
FSDataOutputStream stream = hdfs.create(hdPath);
for(int i = 0; i < 10; i++) {
- stream.write("Hello HDFS\n".getBytes());
+ stream.write("Hello HDFS\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
@@ -193,7 +194,7 @@ public class HDFSTest {
fs.mkdirs(directory);
- byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes();
+ byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(ConfigConstants.DEFAULT_CHARSET);
for (Path file: Arrays.asList(singleFile, directoryFile)) {
org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 05ed6fa..d454765 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -78,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
if (parser.resetErrorStateAndParse(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) {
return parser.getLastResult();
} else {
- String s = new String(bytes, offset, numBytes);
+ String s = new String(bytes, offset, numBytes, getCharset());
throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index ce37c74..b752966 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -155,7 +155,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
if (isLenient()) {
return false;
} else {
- throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index a303ff7..d047aa6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -79,7 +80,7 @@ public class CsvInputFormatTest {
tempFile.deleteOnExit();
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
- fileOutputStream.write(fileContent.getBytes());
+ fileOutputStream.write(fileContent.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// fix the number of blocks and the size of each one.
@@ -793,7 +794,8 @@ public class CsvInputFormatTest {
for (Object[] failure : failures) {
String input = (String) failure[0];
- int result = stringParser.parseField(input.getBytes(), 0, input.length(), new byte[]{'|'}, null);
+ int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
+ input.length(), new byte[]{'|'}, null);
assertThat(result, is(-1));
assertThat(stringParser.getErrorState(), is(failure[1]));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index f6bda30..943db36 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -701,7 +702,7 @@ public class RowCsvInputFormatTest {
for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) {
int result = stringParser.parseField(
- failure.getKey().getBytes(),
+ failure.getKey().getBytes(ConfigConstants.DEFAULT_CHARSET),
0,
failure.getKey().length(),
new byte[]{(byte) '|'},
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
index d89fc41..3d79b08 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
@@ -13,6 +13,7 @@
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
/*
Utility function to deserialize strings, used for non-CSV sinks.
@@ -21,6 +22,6 @@ public class StringDeserializerMap implements MapFunction<byte[], String> {
@Override
public String map(byte[] value) throws Exception {
//discard type byte and size
- return new String(value, 5, value.length - 5);
+ return new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
index b6d60e1..af5eac6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
@@ -14,6 +14,7 @@ package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
/*
Utility function to deserialize strings, used for CSV sinks.
@@ -22,6 +23,6 @@ public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<St
@Override
public Tuple1<String> map(byte[] value) throws Exception {
//5 = string type byte + string size
- return new Tuple1<>(new String(value, 5, value.length - 5));
+ return new Tuple1<>(new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..c968bd6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -12,9 +12,19 @@
*/
package org.apache.flink.python.api.streaming.data;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonPlanBinder;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import org.apache.flink.python.api.streaming.util.StreamPrinter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
@@ -23,9 +33,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
+
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
@@ -33,11 +41,6 @@ import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAM
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This streamer is used by functions to send/receive data to/from an external python process.
@@ -127,12 +130,13 @@ public class PythonStreamer implements Serializable {
Runtime.getRuntime().addShutdownHook(shutdownThread);
OutputStream processOutput = process.getOutputStream();
- processOutput.write("operator\n".getBytes());
- processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
- processOutput.write((id + "\n").getBytes());
- processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes());
- processOutput.write((inputFilePath + "\n").getBytes());
- processOutput.write((outputFilePath + "\n").getBytes());
+ processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((id + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n")
+ .getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
processOutput.flush();
try { // wait a bit to catch syntax errors