You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/09 12:02:26 UTC
[1/9] flink git commit: [FLINK-5824] Fix String/byte conversions
without explicit encoding
Repository: flink
Updated Branches:
refs/heads/master 65ccf7cdf -> 2592a19cc
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index a54b8dd..da83a06 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -27,6 +27,8 @@ import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_LONG;
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_NULL;
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_STRING;
+
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.python.api.types.CustomTypeWrapper;
/**
@@ -177,7 +179,7 @@ public class PythonPlanReceiver implements Serializable {
int size = input.readInt();
byte[] buffer = new byte[size];
input.readFully(buffer);
- return new String(buffer);
+ return new String(buffer, ConfigConstants.DEFAULT_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index ecbc7f4..06af9d8 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -12,11 +12,14 @@
*/
package org.apache.flink.python.api.streaming.plan;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
-import org.apache.flink.python.api.streaming.util.StreamPrinter;
+
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
@@ -82,8 +85,8 @@ public class PythonPlanStreamer implements Serializable {
} catch (IllegalThreadStateException ise) {//Process still running
}
- process.getOutputStream().write("plan\n".getBytes());
- process.getOutputStream().write((server.getLocalPort() + "\n").getBytes());
+ process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
+ process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
process.getOutputStream().flush();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index fce69fa..f228327 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -12,11 +12,13 @@
*/
package org.apache.flink.python.api.streaming.util;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.python.api.types.CustomTypeWrapper;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
public class SerializationUtils {
public static final byte TYPE_BOOLEAN = (byte) 34;
public static final byte TYPE_BYTE = (byte) 33;
@@ -168,7 +170,7 @@ public class SerializationUtils {
public static class StringSerializer extends Serializer<String> {
@Override
public byte[] serializeWithoutTypeInfo(String value) {
- byte[] string = value.getBytes();
+ byte[] string = value.getBytes(ConfigConstants.DEFAULT_CHARSET);
byte[] data = new byte[4 + string.length];
ByteBuffer.wrap(data).putInt(string.length).put(string);
return data;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
index 5ff3572..30a728c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -12,6 +12,8 @@
*/
package org.apache.flink.python.api.streaming.util;
+import org.apache.flink.configuration.ConfigConstants;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -30,7 +32,7 @@ public class StreamPrinter extends Thread {
}
public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
- this.reader = new BufferedReader(new InputStreamReader(stream));
+ this.reader = new BufferedReader(new InputStreamReader(stream, ConfigConstants.DEFAULT_CHARSET));
this.wrapInException = wrapInException;
this.msg = msg;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 5246b94..42abd4c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.mesos.runtime.clusterframework.store;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
@@ -117,7 +118,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
if (value.length == 0) {
frameworkID = Option.empty();
} else {
- frameworkID = Option.apply(Protos.FrameworkID.newBuilder().setValue(new String(value)).build());
+ frameworkID = Option.apply(Protos.FrameworkID.newBuilder().setValue(new String(value,
+ ConfigConstants.DEFAULT_CHARSET)).build());
}
return frameworkID;
@@ -134,7 +136,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
synchronized (startStopLock) {
verifyIsRunning();
- byte[] value = frameworkID.isDefined() ? frameworkID.get().getValue().getBytes() : new byte[0];
+ byte[] value = frameworkID.isDefined() ? frameworkID.get().getValue().getBytes(ConfigConstants.DEFAULT_CHARSET) :
+ new byte[0];
frameworkIdInZooKeeper.setValue(value);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 42fe6a5..113107f 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -36,6 +36,7 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -187,7 +188,7 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
private void send(final String name, final String value) {
try {
String formatted = String.format("%s:%s|g", name, value);
- byte[] data = formatted.getBytes();
+ byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
socket.send(new DatagramPacket(data, data.length, this.address));
}
catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index e53ef44..17fd65a 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -382,7 +382,7 @@ public class StatsDReporterTest extends TestLogger {
socket.receive(packet);
- String line = new String(packet.getData(), 0, packet.getLength());
+ String line = new String(packet.getData(), 0, packet.getLength(), ConfigConstants.DEFAULT_CHARSET);
lines.put(line, obj);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index 585a2f3..d14b7a2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -48,7 +48,7 @@ import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
-
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.ExceptionUtils;
import java.io.File;
@@ -65,7 +65,7 @@ import java.util.UUID;
@ChannelHandler.Sharable
public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
- private static final Charset ENCODING = Charset.forName("UTF-8");
+ private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
/** A decoder factory that always stores POST chunks on disk */
private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
index b4788dd..85b3b13 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java
@@ -26,9 +26,8 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
-
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.ExceptionUtils;
-
import org.slf4j.Logger;
/**
@@ -61,7 +60,8 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<Object> {
private void sendError(ChannelHandlerContext ctx, String error) {
if (ctx.channel().isActive()) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(error.getBytes()));
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ Unpooled.wrappedBuffer(error.getBytes(ConfigConstants.DEFAULT_CHARSET)));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 8bd58a3..3d7fbed 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -30,6 +30,7 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.util.ExceptionUtils;
@@ -59,7 +60,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class);
- private static final Charset ENCODING = Charset.forName("UTF-8");
+ private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
index 127efdb..53f9f04 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
@@ -29,8 +29,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;
-
-import java.io.UnsupportedEncodingException;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Responder that returns a constant String.
@@ -41,12 +40,7 @@ public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
private final byte[] encodedText;
public ConstantTextHandler(String text) {
- try {
- this.encodedText = text.getBytes("UTF-8");
- }
- catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
+ this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index ca61ec1..6616a2a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
@@ -32,8 +33,6 @@ import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -94,9 +93,9 @@ public class HandlerRedirectUtils {
return redirectResponse;
}
- public static HttpResponse getUnavailableResponse() throws UnsupportedEncodingException {
+ public static HttpResponse getUnavailableResponse() {
String result = "Service temporarily unavailable due to an ongoing leader election. Please refresh.";
- byte[] bytes = result.getBytes(Charset.forName("UTF-8"));
+ byte[] bytes = result.getBytes(ConfigConstants.DEFAULT_CHARSET);
HttpResponse unavailableResponse = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index b618d85..f5d6853 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -62,7 +62,7 @@ public class JobCancellationWithSavepointHandlers {
private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
/** Encodings for String. */
- private static final Charset ENCODING = Charset.forName("UTF-8");
+ private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
/** Shared lock between Trigger and In-Progress handlers. */
private final Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 1002bf3..37ee814 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -46,6 +46,7 @@ import io.netty.handler.stream.ChunkedFile;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
@@ -345,7 +346,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
- byte[] buf = message.getBytes();
+ byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
ByteBuf b = Unpooled.copiedBuffer(buf);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 1db1369..4177f44 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.Routed;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
@@ -139,7 +140,7 @@ public class TaskManagerLogHandlerTest {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class);
- exception.set(new String(data.array()));
+ exception.set(new String(data.array(), ConfigConstants.DEFAULT_CHARSET));
return null;
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index b5ba565..c540f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.blob;
import com.google.common.io.BaseEncoding;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.highavailability.ZookeeperHaServices;
@@ -64,7 +65,7 @@ public class BlobUtils {
/**
* The default character set to translate between characters and bytes.
*/
- static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
+ static final Charset DEFAULT_CHARSET = ConfigConstants.DEFAULT_CHARSET;
/**
* Creates a BlobStore based on the parameters set in the configuration.
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
index 5184db8..08ec35e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -94,11 +95,12 @@ public class SavepointV1Test {
for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
StreamStateHandle nonPartitionableState =
- new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes());
+ new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes(
+ ConfigConstants.DEFAULT_CHARSET));
StreamStateHandle operatorStateBackend =
- new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes());
+ new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
StreamStateHandle operatorStateStream =
- new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes());
+ new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
@@ -127,13 +129,15 @@ public class SavepointV1Test {
if (hasKeyedBackend) {
keyedStateBackend = new KeyGroupsStateHandle(
new KeyGroupRangeOffsets(1, 1, new long[]{42}),
- new TestByteStreamStateHandleDeepCompare("c", "Hello".getBytes()));
+ new TestByteStreamStateHandleDeepCompare("c", "Hello"
+ .getBytes(ConfigConstants.DEFAULT_CHARSET)));
}
if (hasKeyedStream) {
keyedStateStream = new KeyGroupsStateHandle(
new KeyGroupRangeOffsets(1, 1, new long[]{23}),
- new TestByteStreamStateHandleDeepCompare("d", "World".getBytes()));
+ new TestByteStreamStateHandleDeepCompare("d", "World"
+ .getBytes(ConfigConstants.DEFAULT_CHARSET)));
}
taskState.putState(subtaskIdx, new SubtaskState(
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
index b5b4d76..74de096 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
@@ -19,12 +19,13 @@
package org.apache.flink.runtime.io.network.api.serialization.types;
-import java.io.IOException;
-import java.util.Random;
-
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import java.io.IOException;
+import java.util.Random;
+
public class AsciiStringType implements SerializationTestType {
private static final int MAX_LEN = 1500;
@@ -54,7 +55,7 @@ public class AsciiStringType implements SerializationTestType {
@Override
public int length() {
- return value.getBytes().length + 2;
+ return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 2;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index f9aea89..1fd004f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
@@ -27,13 +27,12 @@ import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class DataSinkTaskTest extends TaskTestBase {
@@ -468,7 +467,7 @@ public class DataSinkTaskTest extends TaskTestBase {
this.bld.append(value.getValue());
this.bld.append('\n');
- byte[] bytes = this.bld.toString().getBytes();
+ byte[] bytes = this.bld.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
this.stream.write(bytes);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 1ebdcb0..82f3d1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -20,6 +20,7 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -247,7 +248,7 @@ public class DataSourceTaskTest extends TaskTestBase {
@Override
public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
- String line = new String(record, offset, numBytes);
+ String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
try {
this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
@@ -290,7 +291,7 @@ public class DataSourceTaskTest extends TaskTestBase {
return null;
}
- String line = new String(record, offset, numBytes);
+ String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
try {
this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
@@ -324,7 +325,7 @@ public class DataSourceTaskTest extends TaskTestBase {
this.cnt++;
- String line = new String(record, offset, numBytes);
+ String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
try {
this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 8617193..980075d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -107,7 +108,7 @@ public class FsCheckpointStateOutputStreamTest {
stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);
- byte[] data = "testme!".getBytes();
+ byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
for (int i = 0; i < 7; ++i) {
Assert.assertEquals(i * (1 + data.length), stream.getPos());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 2de4c01..7f5a841 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Very simple serialization schema for strings.
@@ -31,7 +32,7 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
@Override
public String deserialize(byte[] message) {
- return new String(message);
+ return new String(message, ConfigConstants.DEFAULT_CHARSET);
}
@Override
@@ -41,7 +42,7 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
@Override
public byte[] serialize(String element) {
- return element.getBytes();
+ return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 23491da..877e707 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink;
import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.TestLogger;
@@ -55,7 +56,7 @@ public class SocketClientSinkTest extends TestLogger {
private SerializationSchema<String> simpleSchema = new SerializationSchema<String>() {
@Override
public byte[] serialize(String element) {
- return element.getBytes();
+ return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
}
};
@@ -300,4 +301,4 @@ public class SocketClientSinkTest extends TestLogger {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 90d8861..0ad42bb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -340,7 +341,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
for(int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx +": "+ sampleLine + " " + i +"\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
return new Tuple2<>(tmp, str.toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
new file mode 100644
index 0000000..8fdf74b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.misc;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParserTest;
+import org.apache.flink.types.parser.VarLengthStringParserTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckForbiddenMethodsUsage {
+
+ private static class ForbiddenCall {
+ private final Method method;
+ private final Constructor constructor;
+ private final List<Member> exclusions;
+
+ public Method getMethod() {
+ return method;
+ }
+
+ public List<Member> getExclusions() {
+ return exclusions;
+ }
+
+ private ForbiddenCall(Method method, Constructor ctor, List<Member> exclusions) {
+ this.method = method;
+ this.exclusions = exclusions;
+ this.constructor = ctor;
+ }
+
+ public static ForbiddenCall of(Method method) {
+ return new ForbiddenCall(method, null, Collections.<Member>emptyList());
+ }
+
+ public static ForbiddenCall of(Method method, List<Member> exclusions) {
+ return new ForbiddenCall(method, null, exclusions);
+ }
+
+ public static ForbiddenCall of(Constructor ctor) {
+ return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
+ }
+
+ public static ForbiddenCall of(Constructor ctor, List<Member> exclusions) {
+ return new ForbiddenCall(null, ctor, exclusions);
+ }
+
+ public Set<Member> getUsages(Reflections reflections) {
+ if (method == null) {
+ return reflections.getConstructorUsage(constructor);
+ }
+
+ return reflections.getMethodUsage(method);
+ }
+ }
+
+ private static List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
+
+ @BeforeClass
+ public static void init() throws Exception {
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
+ Lists.<Member>newArrayList(
+ FieldParserTest.class.getMethod("testEndsWithDelimiter"),
+ FieldParserTest.class.getMethod("testDelimiterNext")
+ )));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
+ }
+
+ @Test
+ public void testNoDefaultEncoding() throws Exception {
+ final Reflections reflections = new Reflections(new ConfigurationBuilder()
+ .addUrls(ClasspathHelper.forPackage("org.apache.flink"))
+ .addScanners(new MemberUsageScanner()));
+
+
+ for (ForbiddenCall forbiddenCall : forbiddenCalls) {
+ final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
+ methodUsages.removeAll(forbiddenCall.getExclusions());
+ assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+ }
+ }
+}
[9/9] flink git commit: [FLINK-5824] (followup) Minor code cleanups
in CheckForbiddenMethodsUsage
Posted by se...@apache.org.
[FLINK-5824] (followup) Minor code cleanups in CheckForbiddenMethodsUsage
- Move to maual tests package
- Adjust order of methods (ctor, instance, class)
- Replace Guava with Java Util
- Make charset in SimpleStringSchema configurable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab91cc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab91cc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab91cc5
Branch: refs/heads/master
Commit: 3ab91cc5d233265b7c4a1497e7294c9065261dec
Parents: 53fedbd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 8 16:10:52 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100
----------------------------------------------------------------------
.../util/serialization/SimpleStringSchema.java | 61 ++++++++-
.../serialization/SimpleStringSchemaTest.java | 51 ++++++++
.../test/manual/CheckForbiddenMethodsUsage.java | 126 +++++++++++++++++++
.../test/misc/CheckForbiddenMethodsUsage.java | 115 -----------------
4 files changed, 235 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 7f5a841..ddc55a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -20,19 +20,59 @@ package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigConstants;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Very simple serialization schema for strings.
+ *
+ * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
*/
@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;
+ /** The charset to use to convert between strings and bytes.
+ * The field is transient because we serialize a different delegate object instead */
+ private transient Charset charset;
+
+ /**
+ * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
+ */
+ public SimpleStringSchema() {
+ this(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
+ *
+ * @param charset The charset to use to convert between strings and bytes.
+ */
+ public SimpleStringSchema(Charset charset) {
+ this.charset = checkNotNull(charset);
+ }
+
+ /**
+ * Gets the charset used by this schema for serialization.
+ * @return The charset used by this schema for serialization.
+ */
+ public Charset getCharset() {
+ return charset;
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka Serialization
+ // ------------------------------------------------------------------------
+
@Override
public String deserialize(byte[] message) {
- return new String(message, ConfigConstants.DEFAULT_CHARSET);
+ return new String(message, charset);
}
@Override
@@ -42,11 +82,26 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
@Override
public byte[] serialize(String element) {
- return element.getBytes(ConfigConstants.DEFAULT_CHARSET);
+ return element.getBytes(charset);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
+
+ // ------------------------------------------------------------------------
+ // Java Serialization
+ // ------------------------------------------------------------------------
+
+ private void writeObject (ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ out.writeUTF(charset.name());
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ String charsetName = in.readUTF();
+ this.charset = Charset.forName(charsetName);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
new file mode 100644
index 0000000..74b1d18
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link SimpleStringSchema}.
+ */
+public class SimpleStringSchemaTest {
+
+ @Test
+ public void testSerializationWithAnotherCharset() {
+ final Charset charset = StandardCharsets.UTF_16BE;
+ final String string = "\u4e4b\u6383\u63cf\u53e4\u7c4d\u7248\u5be6\u4e43\u59da\u9f10\u7684";
+ final byte[] bytes = string.getBytes(charset);
+
+ assertArrayEquals(bytes, new SimpleStringSchema(charset).serialize(string));
+ assertEquals(string, new SimpleStringSchema(charset).deserialize(bytes));
+ }
+
+ @Test
+ public void testSerializability() throws Exception {
+ final SimpleStringSchema schema = new SimpleStringSchema(StandardCharsets.UTF_16LE);
+ final SimpleStringSchema copy = CommonTestUtils.createCopySerializable(schema);
+
+ assertEquals(schema.getCharset(), copy.getCharset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
new file mode 100644
index 0000000..aabe7c0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.manual;
+
+import org.apache.flink.types.parser.FieldParserTest;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests via reflection that certain methods are not called in Flink.
+ *
+ * <p>Forbidden calls include:
+ * - Byte / String conversions that do not specify an explicit charset
+ * because they produce different results in different locales
+ */
+public class CheckForbiddenMethodsUsage {
+
+ private static class ForbiddenCall {
+
+ private final Method method;
+ private final Constructor<?> constructor;
+ private final List<Member> exclusions;
+
+ private ForbiddenCall(Method method, Constructor<?> ctor, List<Member> exclusions) {
+ this.method = method;
+ this.exclusions = exclusions;
+ this.constructor = ctor;
+ }
+
+ public Method getMethod() {
+ return method;
+ }
+
+ public List<Member> getExclusions() {
+ return exclusions;
+ }
+
+ public Set<Member> getUsages(Reflections reflections) {
+ if (method == null) {
+ return reflections.getConstructorUsage(constructor);
+ }
+
+ return reflections.getMethodUsage(method);
+ }
+
+ public static ForbiddenCall of(Method method) {
+ return new ForbiddenCall(method, null, Collections.<Member>emptyList());
+ }
+
+ public static ForbiddenCall of(Method method, List<Member> exclusions) {
+ return new ForbiddenCall(method, null, exclusions);
+ }
+
+ public static ForbiddenCall of(Constructor<?> ctor) {
+ return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
+ }
+
+ public static ForbiddenCall of(Constructor<?> ctor, List<Member> exclusions) {
+ return new ForbiddenCall(null, ctor, exclusions);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
+
+ @BeforeClass
+ public static void init() throws Exception {
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
+ Arrays.<Member>asList(
+ FieldParserTest.class.getMethod("testEndsWithDelimiter"),
+ FieldParserTest.class.getMethod("testDelimiterNext")
+ )));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
+ forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
+ }
+
+ @Test
+ public void testNoDefaultEncoding() throws Exception {
+ final Reflections reflections = new Reflections(new ConfigurationBuilder()
+ .useParallelExecutor(Runtime.getRuntime().availableProcessors())
+ .addUrls(ClasspathHelper.forPackage("org.apache.flink"))
+ .addScanners(new MemberUsageScanner()));
+
+
+ for (ForbiddenCall forbiddenCall : forbiddenCalls) {
+ final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
+ methodUsages.removeAll(forbiddenCall.getExclusions());
+ assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ab91cc5/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
deleted file mode 100644
index 8fdf74b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CheckForbiddenMethodsUsage.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.misc;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FieldParserTest;
-import org.apache.flink.types.parser.VarLengthStringParserTest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.MemberUsageScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Member;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class CheckForbiddenMethodsUsage {
-
- private static class ForbiddenCall {
- private final Method method;
- private final Constructor constructor;
- private final List<Member> exclusions;
-
- public Method getMethod() {
- return method;
- }
-
- public List<Member> getExclusions() {
- return exclusions;
- }
-
- private ForbiddenCall(Method method, Constructor ctor, List<Member> exclusions) {
- this.method = method;
- this.exclusions = exclusions;
- this.constructor = ctor;
- }
-
- public static ForbiddenCall of(Method method) {
- return new ForbiddenCall(method, null, Collections.<Member>emptyList());
- }
-
- public static ForbiddenCall of(Method method, List<Member> exclusions) {
- return new ForbiddenCall(method, null, exclusions);
- }
-
- public static ForbiddenCall of(Constructor ctor) {
- return new ForbiddenCall(null, ctor, Collections.<Member>emptyList());
- }
-
- public static ForbiddenCall of(Constructor ctor, List<Member> exclusions) {
- return new ForbiddenCall(null, ctor, exclusions);
- }
-
- public Set<Member> getUsages(Reflections reflections) {
- if (method == null) {
- return reflections.getConstructorUsage(constructor);
- }
-
- return reflections.getMethodUsage(method);
- }
- }
-
- private static List<ForbiddenCall> forbiddenCalls = new ArrayList<>();
-
- @BeforeClass
- public static void init() throws Exception {
- forbiddenCalls.add(ForbiddenCall.of(String.class.getMethod("getBytes"),
- Lists.<Member>newArrayList(
- FieldParserTest.class.getMethod("testEndsWithDelimiter"),
- FieldParserTest.class.getMethod("testDelimiterNext")
- )));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class)));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class)));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class)));
- forbiddenCalls.add(ForbiddenCall.of(String.class.getConstructor(byte[].class, int.class, int.class, int.class)));
- }
-
- @Test
- public void testNoDefaultEncoding() throws Exception {
- final Reflections reflections = new Reflections(new ConfigurationBuilder()
- .addUrls(ClasspathHelper.forPackage("org.apache.flink"))
- .addScanners(new MemberUsageScanner()));
-
-
- for (ForbiddenCall forbiddenCall : forbiddenCalls) {
- final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
- methodUsages.removeAll(forbiddenCall.getExclusions());
- assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
- }
- }
-}
[3/9] flink git commit: [FLINK-5598] [web frontend] Return filename
after jar upload
Posted by se...@apache.org.
[FLINK-5598] [web frontend] Return filename after jar upload
This closes #3469
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84afd068
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84afd068
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84afd068
Branch: refs/heads/master
Commit: 84afd068ac229e4d598fffdf10075e9d356fbe07
Parents: 59f0f7a
Author: Fabian Wollert <da...@gmail.com>
Authored: Fri Mar 3 17:40:02 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/webmonitor/handlers/JarUploadHandler.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84afd068/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 3d7cb8a..ec8516d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -59,10 +59,11 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
return "{\"error\": \"Only Jar files are allowed.\"}";
}
- File newFile = new File(jarDir, UUID.randomUUID() + "_" + filename);
+ String filenameWithUUID = UUID.randomUUID() + "_" + filename;
+ File newFile = new File(jarDir, filenameWithUUID);
if (tempFile.renameTo(newFile)) {
// all went well
- return "{}";
+ return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
}
else {
//noinspection ResultOfMethodCallIgnored
[8/9] flink git commit: [FLINK-5135] [runtime] Expand the fields in
ResourceProfile based on ResourceSpec
Posted by se...@apache.org.
[FLINK-5135] [runtime] Expand the fields in ResourceProfile based on ResourceSpec
This closes #3457
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2592a19c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2592a19c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2592a19c
Branch: refs/heads/master
Commit: 2592a19ccd4f83e4f57a835b03c7846d6edde927
Parents: 527eabd
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Thu Mar 2 18:23:42 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100
----------------------------------------------------------------------
.../clusterframework/types/ResourceProfile.java | 104 +++++++++++++++----
.../taskexecutor/TaskManagerServices.java | 2 +-
.../types/ResourceProfileTest.java | 8 +-
.../slotmanager/SlotManagerTest.java | 2 +-
.../taskexecutor/TaskExecutorITCase.java | 2 +-
5 files changed, 91 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index ddc7547..faa93e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -36,27 +36,54 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
private static final long serialVersionUID = 1L;
- public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1L);
+ public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
// ------------------------------------------------------------------------
/** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
private final double cpuCores;
- /** How many memory in mb are needed */
- private final long memoryInMB;
+ /** How many heap memory in mb are needed */
+ private final int heapMemoryInMB;
+
+ /** How many direct memory in mb are needed */
+ private final int directMemoryInMB;
+
+ /** How many native memory in mb are needed */
+ private final int nativeMemoryInMB;
// ------------------------------------------------------------------------
/**
* Creates a new ResourceProfile.
- *
- * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
- * @param memoryInMB The size of the memory, in megabytes.
+ *
+ * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
+ * @param heapMemoryInMB The size of the heap memory, in megabytes.
+ * @param directMemoryInMB The size of the direct memory, in megabytes.
+ * @param nativeMemoryInMB The size of the native memory, in megabytes.
*/
- public ResourceProfile(double cpuCores, long memoryInMB) {
+ public ResourceProfile(
+ double cpuCores,
+ int heapMemoryInMB,
+ int directMemoryInMB,
+ int nativeMemoryInMB) {
this.cpuCores = cpuCores;
- this.memoryInMB = memoryInMB;
+ this.heapMemoryInMB = heapMemoryInMB;
+ this.directMemoryInMB = directMemoryInMB;
+ this.nativeMemoryInMB = nativeMemoryInMB;
+ }
+
+ /**
+ * Creates a new simple ResourceProfile used for testing.
+ *
+ * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
+ * @param heapMemoryInMB The size of the heap memory, in megabytes.
+ */
+ public ResourceProfile(double cpuCores, int heapMemoryInMB) {
+ this.cpuCores = cpuCores;
+ this.heapMemoryInMB = heapMemoryInMB;
+ this.directMemoryInMB = 0;
+ this.nativeMemoryInMB = 0;
}
/**
@@ -66,7 +93,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
*/
public ResourceProfile(ResourceProfile other) {
this.cpuCores = other.cpuCores;
- this.memoryInMB = other.memoryInMB;
+ this.heapMemoryInMB = other.heapMemoryInMB;
+ this.directMemoryInMB = other.directMemoryInMB;
+ this.nativeMemoryInMB = other.nativeMemoryInMB;
}
// ------------------------------------------------------------------------
@@ -80,11 +109,35 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
}
/**
- * Get the memory needed in MB
- * @return The memory in MB
+ * Get the heap memory needed in MB
+ * @return The heap memory in MB
+ */
+ public long getHeapMemoryInMB() {
+ return heapMemoryInMB;
+ }
+
+ /**
+ * Get the direct memory needed in MB
+ * @return The direct memory in MB
+ */
+ public int getDirectMemoryInMB() {
+ return directMemoryInMB;
+ }
+
+ /**
+ * Get the native memory needed in MB
+ * @return The native memory in MB
+ */
+ public int getNativeMemoryInMB() {
+ return nativeMemoryInMB;
+ }
+
+ /**
+ * Get the total memory needed in MB
+ * @return The total memory in MB
*/
- public long getMemoryInMB() {
- return memoryInMB;
+ public int getMemoryInMB() {
+ return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
}
/**
@@ -94,22 +147,29 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @return true if the requirement is matched, otherwise false
*/
public boolean isMatching(ResourceProfile required) {
- return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
+ return cpuCores >= required.getCpuCores() &&
+ heapMemoryInMB >= required.getHeapMemoryInMB() &&
+ directMemoryInMB >= required.getDirectMemoryInMB() &&
+ nativeMemoryInMB >= required.getNativeMemoryInMB();
}
@Override
public int compareTo(@Nonnull ResourceProfile other) {
- int cmp1 = Long.compare(this.memoryInMB, other.memoryInMB);
+ int cmp1 = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB());
int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
- return (cmp1 != 0) ? cmp1 : cmp2;
+ return (cmp1 != 0) ? cmp1 : cmp2;
}
// ------------------------------------------------------------------------
@Override
public int hashCode() {
- long cpuBits = Double.doubleToRawLongBits(cpuCores);
- return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ (memoryInMB >> 32));
+ final long cpuBits = Double.doubleToLongBits(cpuCores);
+ int result = (int) (cpuBits ^ (cpuBits >>> 32));
+ result = 31 * result + heapMemoryInMB;
+ result = 31 * result + directMemoryInMB;
+ result = 31 * result + nativeMemoryInMB;
+ return result;
}
@Override
@@ -119,7 +179,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
}
else if (obj != null && obj.getClass() == ResourceProfile.class) {
ResourceProfile that = (ResourceProfile) obj;
- return this.cpuCores == that.cpuCores && this.memoryInMB == that.memoryInMB;
+ return this.cpuCores == that.cpuCores &&
+ this.heapMemoryInMB == that.heapMemoryInMB &&
+ this.directMemoryInMB == that.directMemoryInMB;
}
else {
return false;
@@ -130,7 +192,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
public String toString() {
return "ResourceProfile{" +
"cpuCores=" + cpuCores +
- ", memoryInMB=" + memoryInMB +
+ ", heapMemoryInMB=" + heapMemoryInMB +
+ ", directMemoryInMB=" + directMemoryInMB +
+ ", nativeMemoryInMB=" + nativeMemoryInMB +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ae5a383..19c5c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -201,7 +201,7 @@ public class TaskManagerServices {
final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
- resourceProfiles.add(new ResourceProfile(1.0, 42L));
+ resourceProfiles.add(new ResourceProfile(1.0, 42));
}
final TimerService<AllocationID> timerService = new TimerService<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index cd1d895..aacdcfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -27,10 +27,10 @@ public class ResourceProfileTest {
@Test
public void testMatchRequirement() throws Exception {
- ResourceProfile rp1 = new ResourceProfile(1.0, 100);
- ResourceProfile rp2 = new ResourceProfile(1.0, 200);
- ResourceProfile rp3 = new ResourceProfile(2.0, 100);
- ResourceProfile rp4 = new ResourceProfile(2.0, 200);
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100);
+ ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200);
+ ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100);
+ ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200);
assertFalse(rp1.isMatching(rp2));
assertTrue(rp2.isMatching(rp1));
http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 948c129..041747d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -53,7 +53,7 @@ public class SlotManagerTest {
private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
- private static final long DEFAULT_TESTING_MEMORY = 512;
+ private static final int DEFAULT_TESTING_MEMORY = 512;
private static final ResourceProfile DEFAULT_TESTING_PROFILE =
new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/2592a19c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 36fd65b..0f884f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -86,7 +86,7 @@ public class TaskExecutorITCase {
final String jmAddress = "jm";
final UUID jmLeaderId = UUID.randomUUID();
final JobID jobId = new JobID();
- final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1L);
+ final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
[2/9] flink git commit: [FLINK-5824] Fix String/byte conversions
without explicit encoding
Posted by se...@apache.org.
[FLINK-5824] Fix String/byte conversions without explicit encoding
This closes #3468
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53fedbd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53fedbd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53fedbd2
Branch: refs/heads/master
Commit: 53fedbd2894c6c7b839d8fdcc0dbf1e6e21e631a
Parents: 84afd06
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Fri Mar 3 13:24:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100
----------------------------------------------------------------------
.../flink/api/java/io/AvroOutputFormat.java | 9 +-
.../kafka/internals/ZookeeperOffsetHandler.java | 5 +-
.../testutils/ZooKeeperStringSerializer.java | 9 +-
.../manualtests/ManualConsumerProducerTest.java | 3 +-
...nualExactlyOnceWithStreamReshardingTest.java | 4 +-
.../kinesis/manualtests/ManualProducerTest.java | 3 +-
.../testutils/FakeKinesisBehavioursFactory.java | 5 +-
.../nifi/examples/NiFiSinkTopologyExample.java | 4 +-
.../connectors/rabbitmq/RMQSourceTest.java | 7 +-
.../typeutils/runtime/StringArrayWritable.java | 5 +-
.../hbase/example/HBaseFlinkTestConstants.java | 6 +-
.../addons/hbase/example/HBaseWriteExample.java | 5 +-
.../state/RocksDBKeyedStateBackend.java | 8 +-
.../state/RocksDBMergeIteratorTest.java | 3 +-
.../flink/storm/wrappers/StormTupleTest.java | 5 +-
.../io/SimpleTweetInputFormat.java | 2 +-
.../api/common/io/GenericCsvInputFormat.java | 6 +-
.../flink/configuration/ConfigConstants.java | 7 ++
.../memory/ByteArrayOutputStreamWithPos.java | 3 +-
.../apache/flink/types/parser/BigIntParser.java | 5 +-
.../flink/types/parser/BooleanParser.java | 9 +-
.../apache/flink/types/parser/DoubleParser.java | 5 +-
.../flink/types/parser/DoubleValueParser.java | 3 +-
.../apache/flink/types/parser/FloatParser.java | 5 +-
.../flink/types/parser/FloatValueParser.java | 3 +-
.../flink/types/parser/SqlDateParser.java | 5 +-
.../flink/types/parser/SqlTimeParser.java | 5 +-
.../flink/types/parser/SqlTimestampParser.java | 5 +-
.../api/common/io/DelimitedInputFormatTest.java | 7 +-
.../api/common/io/FileInputFormatTest.java | 7 +-
.../common/state/ValueStateDescriptorTest.java | 2 +-
.../runtime/kryo/KryoClearedBufferTest.java | 8 +-
.../flink/types/parser/ParserTestBase.java | 21 ++--
.../types/parser/VarLengthStringParserTest.java | 23 ++--
.../socket/SocketWindowWordCountITCase.java | 7 +-
.../ContinuousFileProcessingITCase.java | 3 +-
.../ContinuousFileProcessingMigrationTest.java | 3 +-
.../hdfstests/ContinuousFileProcessingTest.java | 3 +-
.../org/apache/flink/hdfstests/HDFSTest.java | 5 +-
.../flink/api/java/io/PrimitiveInputFormat.java | 2 +-
.../flink/api/java/io/RowCsvInputFormat.java | 2 +-
.../flink/api/java/io/CsvInputFormatTest.java | 6 +-
.../api/java/io/RowCsvInputFormatTest.java | 3 +-
.../functions/util/StringDeserializerMap.java | 3 +-
.../util/StringTupleDeserializerMap.java | 3 +-
.../api/streaming/data/PythonStreamer.java | 34 +++---
.../api/streaming/plan/PythonPlanReceiver.java | 4 +-
.../api/streaming/plan/PythonPlanStreamer.java | 9 +-
.../api/streaming/util/SerializationUtils.java | 8 +-
.../api/streaming/util/StreamPrinter.java | 4 +-
.../store/ZooKeeperMesosWorkerStore.java | 7 +-
.../flink/metrics/statsd/StatsDReporter.java | 3 +-
.../metrics/statsd/StatsDReporterTest.java | 2 +-
.../runtime/webmonitor/HttpRequestHandler.java | 4 +-
.../webmonitor/PipelineErrorHandler.java | 6 +-
.../webmonitor/RuntimeMonitorHandler.java | 3 +-
.../handlers/ConstantTextHandler.java | 10 +-
.../handlers/HandlerRedirectUtils.java | 7 +-
.../JobCancellationWithSavepointHandlers.java | 2 +-
.../handlers/TaskManagerLogHandler.java | 3 +-
.../handlers/TaskManagerLogHandlerTest.java | 3 +-
.../apache/flink/runtime/blob/BlobUtils.java | 3 +-
.../checkpoint/savepoint/SavepointV1Test.java | 14 ++-
.../serialization/types/AsciiStringType.java | 9 +-
.../runtime/operators/DataSinkTaskTest.java | 9 +-
.../runtime/operators/DataSourceTaskTest.java | 7 +-
.../FsCheckpointStateOutputStreamTest.java | 3 +-
.../util/serialization/SimpleStringSchema.java | 5 +-
.../functions/sink/SocketClientSinkTest.java | 5 +-
...ontinuousFileProcessingCheckpointITCase.java | 3 +-
.../test/misc/CheckForbiddenMethodsUsage.java | 115 +++++++++++++++++++
71 files changed, 359 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 600d1e5..1db45a5 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -25,13 +25,14 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
import java.io.IOException;
import java.io.Serializable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
/**
@@ -154,7 +155,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
}
if(userDefinedSchema != null) {
- byte[] json = userDefinedSchema.toString().getBytes();
+ byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
out.writeInt(json.length);
out.write(json);
} else {
@@ -175,7 +176,7 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
byte[] json = new byte[length];
in.readFully(json);
- Schema schema = new Schema.Parser().parse(new String(json));
+ Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
setSchema(schema);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index cec980f..c02c2cb 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,6 +24,7 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
@@ -119,7 +120,7 @@ public class ZookeeperOffsetHandler {
ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
String path = topicDirs.consumerOffsetDir() + "/" + partition;
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
- byte[] data = Long.toString(offset).getBytes();
+ byte[] data = Long.toString(offset).getBytes(ConfigConstants.DEFAULT_CHARSET);
curatorClient.setData().forPath(path, data);
}
@@ -133,7 +134,7 @@ public class ZookeeperOffsetHandler {
if (data == null) {
return null;
} else {
- String asString = new String(data);
+ String asString = new String(data, ConfigConstants.DEFAULT_CHARSET);
if (asString.length() == 0) {
return null;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
index 8a4c408..37ed408 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -19,20 +19,17 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Simple ZooKeeper serializer for Strings.
*/
public class ZooKeeperStringSerializer implements ZkSerializer {
- private static final Charset CHARSET = Charset.forName("UTF-8");
-
@Override
public byte[] serialize(Object data) {
if (data instanceof String) {
- return ((String) data).getBytes(CHARSET);
+ return ((String) data).getBytes(ConfigConstants.DEFAULT_CHARSET);
}
else {
throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
@@ -45,7 +42,7 @@ public class ZooKeeperStringSerializer implements ZkSerializer {
return null;
}
else {
- return new String(bytes, CHARSET);
+ return new String(bytes, ConfigConstants.DEFAULT_CHARSET);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 6e02a55..63c6c2b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
@@ -62,7 +63,7 @@ public class ManualConsumerProducerTest {
new KinesisSerializationSchema<String>() {
@Override
public ByteBuffer serialize(String element) {
- return ByteBuffer.wrap(element.getBytes());
+ return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// every 10th element goes into a different stream
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 6abea2a..71bcae3 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kinesis.manualtests;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
@@ -119,7 +119,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
}
batch.add(
new PutRecordsRequestEntry()
- .withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes()))
+ .withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString()));
}
count += batchSize;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 35e9ef6..1df717c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.manualtests;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
@@ -59,7 +60,7 @@ public class ManualProducerTest {
new KinesisSerializationSchema<String>() {
@Override
public ByteBuffer serialize(String element) {
- return ByteBuffer.wrap(element.getBytes());
+ return ByteBuffer.wrap(element.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// every 10th element goes into a different stream
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 964ee76..b62e7de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -21,14 +21,15 @@ import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -196,7 +197,7 @@ public class FakeKinesisBehavioursFactory {
for (int i = min; i < max; i++) {
batch.add(
new Record()
- .withData(ByteBuffer.wrap(String.valueOf(i).getBytes()))
+ .withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(i)));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
index 572f949..202e80a 100644
--- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.nifi.examples;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
@@ -45,7 +46,8 @@ public class NiFiSinkTopologyExample {
.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
@Override
public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
- return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
+ return new StandardNiFiDataPacket(s.getBytes(ConfigConstants.DEFAULT_CHARSET),
+ new HashMap<String,String>());
}
}));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 26434ed..b65ddf0 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
@@ -27,6 +28,7 @@ import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -45,7 +47,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.modules.junit4.PowerMockRunner;
-import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -323,7 +324,7 @@ public class RMQSourceTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
- return new String(message);
+ return new String(message, ConfigConstants.DEFAULT_CHARSET);
}
@Override
@@ -365,7 +366,7 @@ public class RMQSourceTest {
// Mock for delivery
final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
- Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
+ Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes(ConfigConstants.DEFAULT_CHARSET));
try {
Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
index c32f5da..8c3a8cd 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils.runtime;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
@@ -41,7 +42,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
out.writeInt(this.array.length);
for(String str : this.array) {
- byte[] b = str.getBytes();
+ byte[] b = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
out.writeInt(b.length);
out.write(b);
}
@@ -54,7 +55,7 @@ public class StringArrayWritable implements Writable, Comparable<StringArrayWrit
for(int i = 0; i < this.array.length; i++) {
byte[] b = new byte[in.readInt()];
in.readFully(b);
- this.array[i] = new String(b);
+ this.array[i] = new String(b, ConfigConstants.DEFAULT_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
index 8579dee..f56295e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -18,10 +18,12 @@
package org.apache.flink.addons.hbase.example;
+import org.apache.flink.configuration.ConfigConstants;
+
public class HBaseFlinkTestConstants {
- public static final byte[] CF_SOME = "someCf".getBytes();
- public static final byte[] Q_SOME = "someQual".getBytes();
+ public static final byte[] CF_SOME = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET);
+ public static final byte[] Q_SOME = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET);
public static final String TEST_TABLE_NAME = "test-table";
public static final String TMP_DIR = "/tmp/test";
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
index 483bdff..64d20c3 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Mutation;
@@ -87,7 +88,7 @@ public class HBaseWriteExample {
@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception {
reuse.f0 = new Text(t.f0);
- Put put = new Put(t.f0.getBytes());
+ Put put = new Put(t.f0.getBytes(ConfigConstants.DEFAULT_CHARSET));
put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
@@ -199,4 +200,4 @@ public class HBaseWriteExample {
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bd8d4dd..eb926c0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
@@ -171,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
// RocksDB seems to need this...
- columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+ columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
try {
@@ -727,7 +728,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
if (null == columnFamily) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
- metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+ metaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ rocksDBKeyedStateBackend.columnOptions);
RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
@@ -824,7 +826,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
- descriptor.getName().getBytes(), columnOptions);
+ descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions);
try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 1cb3b2b..956ef2f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
@@ -71,7 +72,7 @@ public class RocksDBMergeIteratorTest {
for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
ColumnFamilyHandle handle = rocksDB.createColumnFamily(
- new ColumnFamilyDescriptor(("column-" + c).getBytes()));
+ new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos();
DataOutputStream dos = new DataOutputStream(bos);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
index 7ea4b76..5e6c160 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.storm.wrappers;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.MessageId;
@@ -188,7 +189,7 @@ public class StormTupleTest extends AbstractTest {
public void testString() {
final byte[] data = new byte[this.r.nextInt(15)];
this.r.nextBytes(data);
- final String flinkTuple = new String(data);
+ final String flinkTuple = new String(data, ConfigConstants.DEFAULT_CHARSET);
final StormTuple<String> tuple = new StormTuple<String>(flinkTuple, null, -1, null, null,
null);
@@ -304,7 +305,7 @@ public class StormTupleTest extends AbstractTest {
public void testStringTuple() {
final byte[] rawdata = new byte[this.r.nextInt(15)];
this.r.nextBytes(rawdata);
- final String data = new String(rawdata);
+ final String data = new String(rawdata, ConfigConstants.DEFAULT_CHARSET);
final int index = this.r.nextInt(5);
final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
index a72fc14..f7f1bde 100644
--- a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
@@ -90,4 +90,4 @@ public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implemen
public TypeInformation<Tweet> getProducedType() {
return new GenericTypeInfo<Tweet>(Tweet.class);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index b934d41..bddaec9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -362,7 +362,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
if (lenient) {
return false;
} else {
- throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
}
}
@@ -380,7 +380,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
if (lenient) {
return false;
} else {
- String lineAsString = new String(bytes, offset, numBytes);
+ String lineAsString = new String(bytes, offset, numBytes, getCharset());
throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
+ "ParserError " + parser.getErrorState() + " \n"
+ "Expect field types: "+fieldTypesToString() + " \n"
@@ -405,7 +405,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
startPos = skipFields(bytes, startPos, limit, this.fieldDelim);
if (startPos < 0) {
if (!lenient) {
- String lineAsString = new String(bytes, offset, numBytes);
+ String lineAsString = new String(bytes, offset, numBytes, getCharset());
throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
+ "Expect field types: "+fieldTypesToString()+" \n"
+ "in file: "+filePath);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5129f20..c7c8b1a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,9 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
import static org.apache.flink.configuration.ConfigOptions.key;
/**
@@ -1428,6 +1431,10 @@ public final class ConfigConstants {
/** The environment variable name which contains the Flink installation root directory */
public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
+ // ---------------------------- Encoding ------------------------------
+
+ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
/**
* Not instantiable.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index ddfd30a..abf65b1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -19,6 +19,7 @@
package org.apache.flink.core.memory;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -97,7 +98,7 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
}
public String toString() {
- return new String(buffer, 0, count);
+ return new String(buffer, 0, count, ConfigConstants.DEFAULT_CHARSET);
}
private int getEndPosition() {
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 11e459a..4e1aa3e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.math.BigInteger;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link java.math.BigInteger}.
@@ -45,7 +46,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = new BigInteger(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return new BigInteger(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index f8b890a..908c05f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -19,6 +19,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
@PublicEvolving
public class BooleanParser extends FieldParser<Boolean> {
@@ -27,12 +28,12 @@ public class BooleanParser extends FieldParser<Boolean> {
/** Values for true and false respectively. Must be lower case. */
private static final byte[][] TRUE = new byte[][] {
- "true".getBytes(),
- "1".getBytes()
+ "true".getBytes(ConfigConstants.DEFAULT_CHARSET),
+ "1".getBytes(ConfigConstants.DEFAULT_CHARSET)
};
private static final byte[][] FALSE = new byte[][] {
- "false".getBytes(),
- "0".getBytes()
+ "false".getBytes(ConfigConstants.DEFAULT_CHARSET),
+ "0".getBytes(ConfigConstants.DEFAULT_CHARSET)
};
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 2474adf..409cff2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a Double.
@@ -44,7 +45,7 @@ public class DoubleParser extends FieldParser<Double> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Double.parseDouble(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -101,7 +102,7 @@ public class DoubleParser extends FieldParser<Double> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Double.parseDouble(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 10b43c3..8f64691 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.types.DoubleValue;
/**
@@ -43,7 +44,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
double value = Double.parseDouble(str);
reusable.setValue(value);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index e76484e..5636a4e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link Float}.
@@ -42,7 +43,7 @@ public class FloatParser extends FieldParser<Float> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Float.parseFloat(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -99,7 +100,7 @@ public class FloatParser extends FieldParser<Float> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Float.parseFloat(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index a834f22..83fe63f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -20,6 +20,7 @@
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.types.FloatValue;
/**
@@ -43,7 +44,7 @@ public class FloatValueParser extends FieldParser<FloatValue> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
float value = Float.parseFloat(str);
reusable.setValue(value);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
index 859dcf8..24374b8 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.sql.Date;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link java.sql.Date}.
@@ -45,7 +46,7 @@ public class SqlDateParser extends FieldParser<Date> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Date.valueOf(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlDateParser extends FieldParser<Date> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Date.valueOf(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
index fbddadc..363cbb9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.sql.Time;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link Time}.
@@ -39,7 +40,7 @@ public class SqlTimeParser extends FieldParser<Time> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Time.valueOf(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -96,7 +97,7 @@ public class SqlTimeParser extends FieldParser<Time> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Time.valueOf(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
index 0bcb602..97443a5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -21,6 +21,7 @@ package org.apache.flink.types.parser;
import java.sql.Timestamp;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
/**
* Parses a text field into a {@link Timestamp}.
@@ -45,7 +46,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
return -1;
}
- String str = new String(bytes, startPos, endPos - startPos);
+ String str = new String(bytes, startPos, endPos - startPos, ConfigConstants.DEFAULT_CHARSET);
try {
this.result = Timestamp.valueOf(str);
return (endPos == limit) ? limit : endPos + delimiter.length;
@@ -102,7 +103,7 @@ public class SqlTimestampParser extends FieldParser<Timestamp> {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- final String str = new String(bytes, startPos, limitedLen);
+ final String str = new String(bytes, startPos, limitedLen, ConfigConstants.DEFAULT_CHARSET);
return Timestamp.valueOf(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 7ce0a2e..2ff5ee7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.common.io;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -73,11 +74,11 @@ public class DelimitedInputFormatTest {
cfg.setString("delimited-format.delimiter", "\n");
format.configure(cfg);
- assertEquals("\n", new String(format.getDelimiter()));
+ assertEquals("\n", new String(format.getDelimiter(), format.getCharset()));
cfg.setString("delimited-format.delimiter", "&-&");
format.configure(cfg);
- assertEquals("&-&", new String(format.getDelimiter()));
+ assertEquals("&-&", new String(format.getDelimiter(), format.getCharset()));
}
@Test
@@ -428,7 +429,7 @@ public class DelimitedInputFormatTest {
@Override
public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) {
- return new String(bytes, offset, numBytes);
+ return new String(bytes, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5599dd0..dfda372 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
@@ -269,7 +270,7 @@ public class FileInputFormatTest {
File luigiFile = temporaryFolder.newFile("_luigi");
File success = temporaryFolder.newFile("_SUCCESS");
- createTempFiles(contents.getBytes(), child1, child2, luigiFile, success);
+ createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2, luigiFile, success);
// test that only the valid files are accepted
@@ -308,7 +309,7 @@ public class FileInputFormatTest {
File[] files = { child1, child2 };
- createTempFiles(contents.getBytes(), files);
+ createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), files);
// test that only the valid files are accepted
@@ -345,7 +346,7 @@ public class FileInputFormatTest {
File child1 = temporaryFolder.newFile("dataFile1.txt");
File child2 = temporaryFolder.newFile("another_file.bin");
- createTempFiles(contents.getBytes(), child1, child2);
+ createTempFiles(contents.getBytes(ConfigConstants.DEFAULT_CHARSET), child1, child2);
// test that only the valid files are accepted
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 655ffd5..674f7e3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -113,7 +113,7 @@ public class ValueStateDescriptorTest {
}
data[199000] = '\0';
- String defaultValue = new String(data);
+ String defaultValue = new String(data, ConfigConstants.DEFAULT_CHARSET);
ValueStateDescriptor<String> descr =
new ValueStateDescriptor<String>("testName", serializer, defaultValue);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index 7572408..d85ff95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -24,6 +24,7 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
@@ -35,6 +36,7 @@ import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class KryoClearedBufferTest {
@@ -262,7 +264,7 @@ public class KryoClearedBufferTest {
@Override
public void writeBytes(String s) throws IOException {
- byte[] sBuffer = s.getBytes();
+ byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
checkSize(sBuffer.length);
System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
position += sBuffer.length;
@@ -270,7 +272,7 @@ public class KryoClearedBufferTest {
@Override
public void writeChars(String s) throws IOException {
- byte[] sBuffer = s.getBytes();
+ byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
checkSize(sBuffer.length);
System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
position += sBuffer.length;
@@ -278,7 +280,7 @@ public class KryoClearedBufferTest {
@Override
public void writeUTF(String s) throws IOException {
- byte[] sBuffer = s.getBytes();
+ byte[] sBuffer = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
checkSize(sBuffer.length);
System.arraycopy(sBuffer, 0, buffer, position, sBuffer.length);
position += sBuffer.length;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index 9b02147..51ace12 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -85,9 +86,9 @@ public abstract class ParserTestBase<T> extends TestLogger {
FieldParser<T> parser2 = getParser();
FieldParser<T> parser3 = getParser();
- byte[] bytes1 = testValues[i].getBytes();
- byte[] bytes2 = testValues[i].getBytes();
- byte[] bytes3 = testValues[i].getBytes();
+ byte[] bytes1 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+ byte[] bytes2 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
+ byte[] bytes3 = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&'}, parser2.createValue());
@@ -132,8 +133,8 @@ public abstract class ParserTestBase<T> extends TestLogger {
String testVal1 = testValues[i] + "|";
String testVal2 = testValues[i] + "&&&&";
- byte[] bytes1 = testVal1.getBytes();
- byte[] bytes2 = testVal2.getBytes();
+ byte[] bytes1 = testVal1.getBytes(ConfigConstants.DEFAULT_CHARSET);
+ byte[] bytes2 = testVal2.getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead1 = parser1.parseField(bytes1, 0, bytes1.length, new byte[] {'|'}, parser1.createValue());
int numRead2 = parser2.parseField(bytes2, 0, bytes2.length, new byte[] {'&', '&','&', '&'}, parser2.createValue());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
FieldParser<T> parser = getParser();
- byte[] bytes = testValues[i].getBytes();
+ byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1);
@@ -318,7 +319,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
for (int i = 0; i < testValues.length; i++) {
- byte[] bytes = testValues[i].getBytes();
+ byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
T result;
@@ -355,7 +356,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
for (int i = 0; i < testValues.length; i++) {
- byte[] bytes = testValues[i].getBytes();
+ byte[] bytes = testValues[i].getBytes(ConfigConstants.DEFAULT_CHARSET);
try {
parseMethod.invoke(null, bytes, 0, bytes.length, '|');
@@ -389,7 +390,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
for (int i = 0; i < values.length; i++) {
String s = values[i];
- byte[] bytes = s.getBytes();
+ byte[] bytes = s.getBytes(ConfigConstants.DEFAULT_CHARSET);
int numBytes = bytes.length;
System.arraycopy(bytes, 0, result, currPos, numBytes);
currPos += numBytes;
@@ -411,7 +412,7 @@ public abstract class ParserTestBase<T> extends TestLogger {
FieldParser<T> parser = getParser();
for (String emptyString : emptyStrings) {
- byte[] bytes = emptyString.getBytes();
+ byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 718274e..e6e6c62 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.types.parser;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Test;
@@ -45,7 +46,7 @@ public class VarLengthStringParserTest {
this.parser = new StringValueParser();
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "abcdefgh|i|jklmno|".getBytes();
+ byte[] recBytes = "abcdefgh|i|jklmno|".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -63,14 +64,14 @@ public class VarLengthStringParserTest {
// check single field not terminated
- recBytes = "abcde".getBytes();
+ recBytes = "abcde".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 5);
assertTrue(s.getValue().equals("abcde"));
// check last field not terminated
- recBytes = "abcde|fg".getBytes();
+ recBytes = "abcde|fg".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 6);
@@ -88,7 +89,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte)'"');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes();
+ byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -106,14 +107,14 @@ public class VarLengthStringParserTest {
// check single field not terminated
- recBytes = "\"abcde\"".getBytes();
+ recBytes = "\"abcde\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 7);
assertTrue(s.getValue().equals("abcde"));
// check last field not terminated
- recBytes = "\"abcde\"|\"fg\"".getBytes();
+ recBytes = "\"abcde\"|\"fg\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 8);
@@ -124,7 +125,7 @@ public class VarLengthStringParserTest {
assertTrue(s.getValue().equals("fg"));
// check delimiter in quotes
- recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes();
+ recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 11);
@@ -135,7 +136,7 @@ public class VarLengthStringParserTest {
assertTrue(s.getValue().equals("hij|kl|mn|op"));
// check delimiter in quotes last field not terminated
- recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes();
+ recBytes = "\"abcde|fg\"|\"hij|kl|mn|op\"".getBytes(ConfigConstants.DEFAULT_CHARSET);
startPos = 0;
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos == 11);
@@ -153,7 +154,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte)'@');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+ byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -187,7 +188,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte)'"');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "\"abcdefgh\"-|\"jklmno ".getBytes();
+ byte[] recBytes = "\"abcdefgh\"-|\"jklmno ".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
@@ -207,7 +208,7 @@ public class VarLengthStringParserTest {
this.parser.enableQuotedStringParsing((byte) '@');
// check valid strings with out whitespaces and trailing delimiter
- byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+ byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(ConfigConstants.DEFAULT_CHARSET);
StringValue s = new StringValue();
int startPos = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 4a1556a..c6f46e3 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.test.socket;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.testdata.WordCountData;
@@ -62,7 +63,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
new String[] { "--port", String.valueOf(serverPort) });
if (errorMessages.size() != 0) {
- fail("Found error message: " + new String(errorMessages.toByteArray()));
+ fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
}
serverThread.join();
@@ -101,7 +102,7 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
new String[] { "--port", String.valueOf(serverPort) });
if (errorMessages.size() != 0) {
- fail("Found error message: " + new String(errorMessages.toByteArray()));
+ fail("Found error message: " + new String(errorMessages.toByteArray(), ConfigConstants.DEFAULT_CHARSET));
}
serverThread.join();
@@ -154,4 +155,4 @@ public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBa
@Override
public void write(int b) {}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
index df68a76..bc42838 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -294,7 +295,7 @@ public class ContinuousFileProcessingITCase extends StreamingProgramTestBase {
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
return new Tuple2<>(tmp, str.toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 440bfcc..e271a21 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -389,7 +390,7 @@ public class ContinuousFileProcessingMigrationTest {
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx +": "+ sampleLine + " " + i +"\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index f579345..19358e3 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -1042,7 +1043,7 @@ public class ContinuousFileProcessingTest {
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx +": "+ sampleLine + " " + i +"\n";
str.append(line);
- stream.write(line.getBytes());
+ stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 75e666f..8a3f662 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
@@ -80,7 +81,7 @@ public class HDFSTest {
hdfs = hdPath.getFileSystem(hdConf);
FSDataOutputStream stream = hdfs.create(hdPath);
for(int i = 0; i < 10; i++) {
- stream.write("Hello HDFS\n".getBytes());
+ stream.write("Hello HDFS\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
@@ -193,7 +194,7 @@ public class HDFSTest {
fs.mkdirs(directory);
- byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes();
+ byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(ConfigConstants.DEFAULT_CHARSET);
for (Path file: Arrays.asList(singleFile, directoryFile)) {
org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, true);
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 05ed6fa..d454765 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -78,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
if (parser.resetErrorStateAndParse(bytes, offset, numBytes + offset, new byte[]{'\0'}, reuse) >= 0) {
return parser.getLastResult();
} else {
- String s = new String(bytes, offset, numBytes);
+ String s = new String(bytes, offset, numBytes, getCharset());
throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index ce37c74..b752966 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -155,7 +155,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
if (isLenient()) {
return false;
} else {
- throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index a303ff7..d047aa6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -79,7 +80,7 @@ public class CsvInputFormatTest {
tempFile.deleteOnExit();
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
- fileOutputStream.write(fileContent.getBytes());
+ fileOutputStream.write(fileContent.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
// fix the number of blocks and the size of each one.
@@ -793,7 +794,8 @@ public class CsvInputFormatTest {
for (Object[] failure : failures) {
String input = (String) failure[0];
- int result = stringParser.parseField(input.getBytes(), 0, input.length(), new byte[]{'|'}, null);
+ int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
+ input.length(), new byte[]{'|'}, null);
assertThat(result, is(-1));
assertThat(stringParser.getErrorState(), is(failure[1]));
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index f6bda30..943db36 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -701,7 +702,7 @@ public class RowCsvInputFormatTest {
for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) {
int result = stringParser.parseField(
- failure.getKey().getBytes(),
+ failure.getKey().getBytes(ConfigConstants.DEFAULT_CHARSET),
0,
failure.getKey().length(),
new byte[]{(byte) '|'},
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
index d89fc41..3d79b08 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java
@@ -13,6 +13,7 @@
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.ConfigConstants;
/*
Utility function to deserialize strings, used for non-CSV sinks.
@@ -21,6 +22,6 @@ public class StringDeserializerMap implements MapFunction<byte[], String> {
@Override
public String map(byte[] value) throws Exception {
//discard type byte and size
- return new String(value, 5, value.length - 5);
+ return new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
index b6d60e1..af5eac6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java
@@ -14,6 +14,7 @@ package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.ConfigConstants;
/*
Utility function to deserialize strings, used for CSV sinks.
@@ -22,6 +23,6 @@ public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<St
@Override
public Tuple1<String> map(byte[] value) throws Exception {
//5 = string type byte + string size
- return new Tuple1<>(new String(value, 5, value.length - 5));
+ return new Tuple1<>(new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53fedbd2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..c968bd6 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -12,9 +12,19 @@
*/
package org.apache.flink.python.api.streaming.data;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonPlanBinder;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
+import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import org.apache.flink.python.api.streaming.util.StreamPrinter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
@@ -23,9 +33,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
+
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
@@ -33,11 +41,6 @@ import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAM
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
-import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This streamer is used by functions to send/receive data to/from an external python process.
@@ -127,12 +130,13 @@ public class PythonStreamer implements Serializable {
Runtime.getRuntime().addShutdownHook(shutdownThread);
OutputStream processOutput = process.getOutputStream();
- processOutput.write("operator\n".getBytes());
- processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
- processOutput.write((id + "\n").getBytes());
- processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes());
- processOutput.write((inputFilePath + "\n").getBytes());
- processOutput.write((outputFilePath + "\n").getBytes());
+ processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((id + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n")
+ .getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+ processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
processOutput.flush();
try { // wait a bit to catch syntax errors
[7/9] flink git commit: [FLINK-5830] [distributed coordination]
Handle OutOfMemory and fatal errors during process async message in akka rpc
actor
Posted by se...@apache.org.
[FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor
This closes #3360
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527eabdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527eabdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527eabdd
Branch: refs/heads/master
Commit: 527eabdd4fb3e34b0698b53ec9a7fb1348882791
Parents: 8b49ee5
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Mon Feb 20 17:54:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/util/ExceptionUtils.java | 13 +++++++++++++
.../apache/flink/runtime/rpc/akka/AkkaRpcActor.java | 6 ++++--
2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7167a0b..ca81465 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -113,6 +113,19 @@ public final class ExceptionUtils {
}
/**
+ * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM
+ * or an out-of-memory error. See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a
+ * definition of fatal errors.
+ *
+ * @param t The Throwable to check and rethrow.
+ */
+ public static void rethrowIfFatalErrorOrOOM(Throwable t) {
+ if (isJvmFatalError(t) || t instanceof OutOfMemoryError) {
+ throw (Error) t;
+ }
+ }
+
+ /**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/527eabdd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 264ba96..99f8211 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,8 +272,9 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
// run immediately
try {
runAsync.getRunnable().run();
- } catch (final Throwable e) {
- LOG.error("Caught exception while executing runnable in main thread.", e);
+ } catch (Throwable t) {
+ LOG.error("Caught exception while executing runnable in main thread.", t);
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
}
else {
[5/9] flink git commit: [FLINK-4545] [network] remove fixed-size
BufferPool instances
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index e141cc2..424fc8b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.client.JobExecutionException
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
@@ -94,7 +95,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
receiver.setInvokableClass(classOf[BlockingReceiver])
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val jobID = jobGraph.getJobID
@@ -146,7 +148,8 @@ class TaskManagerFailsITCase(_system: ActorSystem)
receiver.setInvokableClass(classOf[BlockingReceiver])
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val jobID = jobGraph.getJobID
[6/9] flink git commit: [FLINK-4545] [network] remove fixed-size
BufferPool instances
Posted by se...@apache.org.
[FLINK-4545] [network] remove fixed-size BufferPool instances
This closes #3467
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b49ee5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b49ee5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b49ee5a
Branch: refs/heads/master
Commit: 8b49ee5aa2e17b1787764c3265e1ebda47d89840
Parents: 3ab91cc
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 10 16:11:08 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:56 2017 +0100
----------------------------------------------------------------------
.../plantranslate/JobGraphGenerator.java | 4 +-
.../BackPressureStatsTrackerITCase.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 4 +-
.../serialization/SpanningRecordSerializer.java | 2 +-
.../io/network/buffer/BufferPoolFactory.java | 4 +-
.../io/network/buffer/LocalBufferPool.java | 36 +++++++++---
.../io/network/buffer/NetworkBufferPool.java | 24 +++-----
.../netty/PartitionRequestServerHandler.java | 2 +-
.../network/partition/ResultPartitionType.java | 16 +----
.../runtime/jobgraph/IntermediateDataSet.java | 8 ---
.../flink/runtime/jobgraph/JobVertex.java | 4 --
.../ExecutionGraphConstructionTest.java | 38 ++++++------
.../ExecutionGraphDeploymentTest.java | 6 +-
.../ExecutionGraphSignalsTest.java | 11 ++--
.../ExecutionVertexLocalityTest.java | 3 +-
.../executiongraph/PointwisePatternTest.java | 15 ++---
.../executiongraph/VertexSlotSharingTest.java | 5 +-
.../io/network/MockNetworkEnvironment.java | 2 +-
.../io/network/api/writer/RecordWriterTest.java | 2 +-
.../network/buffer/BufferPoolFactoryTest.java | 31 +++-------
.../network/buffer/NetworkBufferPoolTest.java | 55 +++++-------------
.../consumer/LocalInputChannelTest.java | 4 +-
.../flink/runtime/jobgraph/JobGraphTest.java | 61 ++++++++++----------
.../runtime/jobgraph/JobTaskVertexTest.java | 5 +-
.../jobgraph/jsonplan/JsonGeneratorTest.java | 4 +-
.../LeaderChangeJobRecoveryTest.java | 3 +-
.../LeaderChangeStateCleanupTest.java | 3 +-
.../TaskCancelAsyncProducerConsumerITCase.java | 3 +-
.../jobmanager/CoLocationConstraintITCase.scala | 11 ++--
.../runtime/jobmanager/JobManagerITCase.scala | 40 ++++++++-----
.../runtime/jobmanager/RecoveryITCase.scala | 18 +++---
.../runtime/jobmanager/SlotSharingITCase.scala | 19 +++---
.../TaskManagerFailsWithSlotSharingITCase.scala | 12 ++--
.../io/BarrierBufferMassiveRandomTest.java | 4 +-
.../runtime/NetworkStackThroughputITCase.java | 10 +++-
.../ZooKeeperLeaderElectionITCase.java | 4 +-
.../taskmanager/TaskManagerFailsITCase.scala | 7 ++-
37 files changed, 238 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index caeb43f..61e5327 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1289,7 +1289,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
syncConfig.setNumberOfIterations(maxNumIterations);
// connect the sync task
- sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+ sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
// ----------------------------- create the iteration tail ------------------------------
@@ -1425,7 +1425,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
syncConfig.setNumberOfIterations(maxNumIterations);
// connect the sync task
- sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+ sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
}
// ----------------------------- create the iteration tails -----------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 30a86a2..1943129 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -108,7 +108,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
//
// 1) Consume all buffers at first (no buffers for the test task)
//
- testBufferPool = networkBufferPool.createBufferPool(1, false);
+ testBufferPool = networkBufferPool.createBufferPool(1);
final List<Buffer> buffers = new ArrayList<>();
while (true) {
Buffer buffer = testBufferPool.requestBuffer();
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 5cf2c26..8e85ffe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -171,7 +171,7 @@ public class NetworkEnvironment {
BufferPool bufferPool = null;
try {
- bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
+ bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions());
partition.registerBufferPool(bufferPool);
resultPartitionManager.registerResultPartition(partition);
@@ -198,7 +198,7 @@ public class NetworkEnvironment {
BufferPool bufferPool = null;
try {
- bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
+ bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels());
gate.setBufferPool(bufferPool);
} catch (Throwable t) {
if (bufferPool != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index cb5665b..6c541a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -43,7 +43,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
/** Intermediate data serialization */
private final DataOutputSerializer serializationBuffer;
- /** Intermediate buffer for data serialization */
+ /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}) */
private ByteBuffer dataBuffer;
/** Intermediate buffer for length serialization */
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index 23321f4..e953158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -29,9 +29,9 @@ public interface BufferPoolFactory {
* Tries to create a buffer pool, which is guaranteed to provide at least the number of required
* buffers.
*
- * <p> The buffer pool is either of dynamic size or fixed.
+ * <p> The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
*/
- BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException;
+ BufferPool createBufferPool(int numRequiredBuffers) throws IOException;
/**
* Destroy callback for updating factory book keeping.
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 86e6870..d6a4cf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -45,30 +45,46 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
class LocalBufferPool implements BufferPool {
+ /** Global network buffer pool to get buffers from. */
private final NetworkBufferPool networkBufferPool;
- // The minimum number of required segments for this pool
+ /** The minimum number of required segments for this pool */
private final int numberOfRequiredMemorySegments;
- // The currently available memory segments. These are segments, which have been requested from
- // the network buffer pool and are currently not handed out as Buffer instances.
+ /**
+ * The currently available memory segments. These are segments, which have been requested from
+ * the network buffer pool and are currently not handed out as Buffer instances.
+ */
private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
- // Buffer availability listeners, which need to be notified when a Buffer becomes available.
- // Listeners can only be registered at a time/state where no Buffer instance was available.
+ /**
+ * Buffer availability listeners, which need to be notified when a Buffer becomes available.
+ * Listeners can only be registered at a time/state where no Buffer instance was available.
+ */
private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
- // The current size of this pool
+ /** The current size of this pool */
private int currentPoolSize;
- // Number of all memory segments, which have been requested from the network buffer pool and are
- // somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
+ /**
+ * Number of all memory segments, which have been requested from the network buffer pool and are
+ * somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
+ */
private int numberOfRequestedMemorySegments;
private boolean isDestroyed;
private BufferPoolOwner owner;
+ /**
+ * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
+ * network buffers being available.
+ *
+ * @param networkBufferPool
+ * global network buffer pool to get buffers from
+ * @param numberOfRequiredMemorySegments
+ * minimum number of network buffers
+ */
LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
this.networkBufferPool = networkBufferPool;
this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
@@ -265,11 +281,15 @@ class LocalBufferPool implements BufferPool {
// ------------------------------------------------------------------------
private void returnMemorySegment(MemorySegment segment) {
+ assert Thread.holdsLock(availableMemorySegments);
+
numberOfRequestedMemorySegments--;
networkBufferPool.recycle(segment);
}
private void returnExcessMemorySegments() {
+ assert Thread.holdsLock(availableMemorySegments);
+
while (numberOfRequestedMemorySegments > currentPoolSize) {
MemorySegment segment = availableMemorySegments.poll();
if (segment == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index dc23341..5345fbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -58,9 +58,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
private final Object factoryLock = new Object();
- private final Set<LocalBufferPool> managedBufferPools = new HashSet<LocalBufferPool>();
-
- public final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
+ private final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
private int numTotalRequiredBuffers;
@@ -182,7 +180,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
// ------------------------------------------------------------------------
@Override
- public BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException {
+ public BufferPool createBufferPool(int numRequiredBuffers) throws IOException {
// It is necessary to use a separate lock from the one used for buffer
// requests to ensure deadlock freedom for failure cases.
synchronized (factoryLock) {
@@ -209,12 +207,6 @@ public class NetworkBufferPool implements BufferPoolFactory {
// non-fixed size buffers.
LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
- // The fixed size pools get their share of buffers and don't change
- // it during their lifetime.
- if (!isFixedSize) {
- managedBufferPools.add(localBufferPool);
- }
-
allBufferPools.add(localBufferPool);
redistributeBuffers();
@@ -231,8 +223,6 @@ public class NetworkBufferPool implements BufferPoolFactory {
synchronized (factoryLock) {
if (allBufferPools.remove(bufferPool)) {
- managedBufferPools.remove(bufferPool);
-
numTotalRequiredBuffers -= bufferPool.getNumberOfRequiredMemorySegments();
try {
@@ -246,7 +236,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
/**
* Destroys all buffer pools that allocate their buffers from this
- * buffer pool (created via {@link #createBufferPool(int, boolean)}).
+ * buffer pool (created via {@link #createBufferPool(int)}).
*/
public void destroyAllBufferPools() {
synchronized (factoryLock) {
@@ -258,7 +248,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
}
// some sanity checks
- if (allBufferPools.size() > 0 || managedBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
+ if (allBufferPools.size() > 0 || numTotalRequiredBuffers > 0) {
throw new IllegalStateException("NetworkBufferPool is not empty after destroying all LocalBufferPools");
}
}
@@ -266,7 +256,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
// Must be called from synchronized block
private void redistributeBuffers() throws IOException {
- int numManagedBufferPools = managedBufferPools.size();
+ assert Thread.holdsLock(factoryLock);
+
+ int numManagedBufferPools = allBufferPools.size();
if (numManagedBufferPools == 0) {
return; // necessary to avoid div by zero when no managed pools
@@ -283,7 +275,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
int bufferPoolIndex = 0;
- for (LocalBufferPool bufferPool : managedBufferPools) {
+ for (LocalBufferPool bufferPool : allBufferPools) {
int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 12b52ec..36c1234 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -67,7 +67,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
- bufferPool = networkBufferPool.createBufferPool(1, false);
+ bufferPool = networkBufferPool.createBufferPool(1);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 65d49ed..43d3a52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -20,14 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
public enum ResultPartitionType {
- BLOCKING(true, false, false),
+ BLOCKING(false, false),
- PIPELINED(false, true, true),
-
- PIPELINED_PERSISTENT(true, true, true);
-
- /** Does the partition live longer than the consuming task? */
- private final boolean isPersistent;
+ PIPELINED(true, true);
/** Can the partition be consumed while being produced? */
private final boolean isPipelined;
@@ -38,8 +33,7 @@ public enum ResultPartitionType {
/**
* Specifies the behaviour of an intermediate result partition at runtime.
*/
- ResultPartitionType(boolean isPersistent, boolean isPipelined, boolean hasBackPressure) {
- this.isPersistent = isPersistent;
+ ResultPartitionType(boolean isPipelined, boolean hasBackPressure) {
this.isPipelined = isPipelined;
this.hasBackPressure = hasBackPressure;
}
@@ -55,8 +49,4 @@ public enum ResultPartitionType {
public boolean isPipelined() {
return isPipelined;
}
-
- public boolean isPersistent() {
- return isPersistent;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index 2d9faa8..f02aaa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -47,14 +47,6 @@ public class IntermediateDataSet implements java.io.Serializable {
private final ResultPartitionType resultType;
// --------------------------------------------------------------------------------------------
-
- public IntermediateDataSet(JobVertex producer) {
- this(new IntermediateDataSetID(), producer);
- }
-
- public IntermediateDataSet(IntermediateDataSetID id, JobVertex producer) {
- this(id, ResultPartitionType.PIPELINED, producer);
- }
public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer) {
this.id = checkNotNull(id);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 9dcaeeb..260bd74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -400,10 +400,6 @@ public class JobVertex implements java.io.Serializable {
return edge;
}
- public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
- return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
- }
-
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index fa48384..e1bad56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -104,11 +104,11 @@ public class ExecutionGraphConstructionTest {
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
@@ -153,7 +153,7 @@ public class ExecutionGraphConstructionTest {
v3.setInvokableClass(AbstractInvokable.class);
// this creates an intermediate result for v1
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
// create results for v2 and v3
IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
@@ -193,7 +193,7 @@ public class ExecutionGraphConstructionTest {
v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL);
v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL);
List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
@@ -230,7 +230,7 @@ public class ExecutionGraphConstructionTest {
v3.setInvokableClass(AbstractInvokable.class);
// this creates an intermediate result for v1
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
// create results for v2 and v3
IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
@@ -269,7 +269,7 @@ public class ExecutionGraphConstructionTest {
v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL);
v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL);
List<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
@@ -558,11 +558,11 @@ public class ExecutionGraphConstructionTest {
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
@@ -625,11 +625,11 @@ public class ExecutionGraphConstructionTest {
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
- v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v3.setInputSplitSource(source1);
v5.setInputSplitSource(source2);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 3d2913f..30824e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -90,9 +90,9 @@ public class ExecutionGraphDeploymentTest {
v3.setInvokableClass(BatchTask.class);
v4.setInvokableClass(BatchTask.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
- v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
- v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutor(),
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 64b9aa2..27844c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -119,15 +120,15 @@ public class ExecutionGraphSignalsTest {
v4.setParallelism(dop[3]);
v5.setParallelism(dop[4]);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
mockNumberOfInputs(1,0);
- v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+ v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
mockNumberOfInputs(3,1);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
mockNumberOfInputs(3,2);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
mockNumberOfInputs(4,3);
- v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+ v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
mockNumberOfInputs(4,2);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index cfd4665..eb85a33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -200,7 +201,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
target.setInvokableClass(NoOpInvokable.class);
DistributionPattern connectionPattern = allToAll ? DistributionPattern.ALL_TO_ALL : DistributionPattern.POINTWISE;
- target.connectNewDataSetAsInput(source, connectionPattern);
+ target.connectNewDataSetAsInput(source, connectionPattern, ResultPartitionType.PIPELINED);
JobGraph testJob = new JobGraph(jobId, "test job", source, target);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 5629c0b..8ff0032 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -62,7 +63,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
@@ -109,7 +110,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
@@ -157,7 +158,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
@@ -206,7 +207,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
@@ -253,7 +254,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
@@ -320,7 +321,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
@@ -378,7 +379,7 @@ public class PointwisePatternTest {
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index bf17485..90e3368 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.api.common.JobID;
@@ -67,8 +68,8 @@ public class VertexSlotSharingTest {
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
- v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v5.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
SlotSharingGroup jg1 = new SlotSharingGroup();
v2.setSlotSharingGroup(jg1);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
index 718ab45..dcdf44c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
@@ -36,7 +36,7 @@ public class MockNetworkEnvironment {
static {
try {
- when(networkBufferPool.createBufferPool(anyInt(), anyBoolean())).thenReturn(mock(BufferPool.class));
+ when(networkBufferPool.createBufferPool(anyInt())).thenReturn(mock(BufferPool.class));
when(networkEnvironment.getNetworkBufferPool()).thenReturn(networkBufferPool);
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 900b5c3..ca1d0a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -177,7 +177,7 @@ public class RecordWriterTest {
try {
buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
- bufferPool = spy(buffers.createBufferPool(1, true));
+ bufferPool = spy(buffers.createBufferPool(1));
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 0ac84dc..49808c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -52,37 +52,20 @@ public class BufferPoolFactoryTest {
@Test(expected = IOException.class)
public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, false);
- }
-
- @Test
- public void testFixedPool() throws IOException {
- BufferPool lbp = networkBufferPool.createBufferPool(1, true);
-
- assertEquals(1, lbp.getNumBuffers());
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2);
}
@Test
public void testSingleManagedPoolGetsAll() throws IOException {
- BufferPool lbp = networkBufferPool.createBufferPool(1, false);
+ BufferPool lbp = networkBufferPool.createBufferPool(1);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), lbp.getNumBuffers());
}
@Test
- public void testSingleManagedPoolGetsAllExceptFixedOnes() throws IOException {
- BufferPool fixed = networkBufferPool.createBufferPool(24, true);
-
- BufferPool lbp = networkBufferPool.createBufferPool(1, false);
-
- assertEquals(24, fixed.getNumBuffers());
- assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - fixed.getNumBuffers(), lbp.getNumBuffers());
- }
-
- @Test
public void testUniformDistribution() throws IOException {
- BufferPool first = networkBufferPool.createBufferPool(0, false);
- BufferPool second = networkBufferPool.createBufferPool(0, false);
+ BufferPool first = networkBufferPool.createBufferPool(0);
+ BufferPool second = networkBufferPool.createBufferPool(0);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
@@ -97,7 +80,7 @@ public class BufferPoolFactoryTest {
int numPools = numBuffers / 32;
for (int i = 0; i < numPools; i++) {
- pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1), random.nextBoolean()));
+ pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1)));
}
int numDistributedBuffers = 0;
@@ -115,11 +98,11 @@ public class BufferPoolFactoryTest {
@Test
public void testCreateDestroy() throws IOException {
- BufferPool first = networkBufferPool.createBufferPool(0, false);
+ BufferPool first = networkBufferPool.createBufferPool(0);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
- BufferPool second = networkBufferPool.createBufferPool(0, false);
+ BufferPool second = networkBufferPool.createBufferPool(0);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index fd5c7a5..ab32685 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -47,15 +47,7 @@ public class NetworkBufferPoolTest {
assertTrue(globalPool.isDestroyed());
try {
- globalPool.createBufferPool(2, true);
- fail("Should throw an IllegalStateException");
- }
- catch (IllegalStateException e) {
- // yippie!
- }
-
- try {
- globalPool.createBufferPool(2, false);
+ globalPool.createBufferPool(2);
fail("Should throw an IllegalStateException");
}
catch (IllegalStateException e) {
@@ -71,26 +63,21 @@ public class NetworkBufferPoolTest {
@Test
public void testDestroyAll() {
try {
- NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+ NetworkBufferPool globalPool = new NetworkBufferPool(8, 128, MemoryType.HEAP);
- BufferPool fixedPool = globalPool.createBufferPool(2, true);
- BufferPool nonFixedPool = globalPool.createBufferPool(5, false);
+ BufferPool lbp = globalPool.createBufferPool(5);
- assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
- assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+ assertEquals(5, lbp.getNumberOfRequiredMemorySegments());
Buffer[] buffers = {
- fixedPool.requestBuffer(),
- fixedPool.requestBuffer(),
-
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer(),
- nonFixedPool.requestBuffer()
+ lbp.requestBuffer(),
+ lbp.requestBuffer(),
+ lbp.requestBuffer(),
+ lbp.requestBuffer(),
+ lbp.requestBuffer(),
+ lbp.requestBuffer(),
+ lbp.requestBuffer(),
+ lbp.requestBuffer()
};
for (Buffer b : buffers) {
@@ -98,16 +85,14 @@ public class NetworkBufferPoolTest {
assertNotNull(b.getMemorySegment());
}
- assertNull(fixedPool.requestBuffer());
- assertNull(nonFixedPool.requestBuffer());
+ assertNull(lbp.requestBuffer());
// destroy all allocated ones
globalPool.destroyAllBufferPools();
// check the destroyed status
assertFalse(globalPool.isDestroyed());
- assertTrue(fixedPool.isDestroyed());
- assertTrue(nonFixedPool.isDestroyed());
+ assertTrue(lbp.isDestroyed());
assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
@@ -122,15 +107,7 @@ public class NetworkBufferPoolTest {
// can request no more buffers
try {
- fixedPool.requestBuffer();
- fail("Should fail with an IllegalStateException");
- }
- catch (IllegalStateException e) {
- // that's the way we like it, aha, aha
- }
-
- try {
- nonFixedPool.requestBuffer();
+ lbp.requestBuffer();
fail("Should fail with an IllegalStateException");
}
catch (IllegalStateException e) {
@@ -138,7 +115,7 @@ public class NetworkBufferPoolTest {
}
// can create a new pool now
- assertNotNull(globalPool.createBufferPool(10, false));
+ assertNotNull(globalPool.createBufferPool(8));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index e05fb56..15ff2da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -128,7 +128,7 @@ public class LocalInputChannelTest {
// Create a buffer pool for this partition
partition.registerBufferPool(
- networkBuffers.createBufferPool(producerBufferPoolSize, true));
+ networkBuffers.createBufferPool(producerBufferPoolSize));
// Create the producer
partitionProducers[i] = new TestPartitionProducer(
@@ -162,7 +162,7 @@ public class LocalInputChannelTest {
i,
parallelism,
numberOfBuffersPerChannel,
- networkBuffers.createBufferPool(parallelism, true),
+ networkBuffers.createBufferPool(parallelism),
partitionManager,
new TaskEventDispatcher(),
partitionIds)));
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 74f1adf..9f06c6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.junit.Test;
public class JobGraphTest {
@@ -44,8 +45,8 @@ public class JobGraphTest {
JobVertex source1 = new JobVertex("source1");
JobVertex source2 = new JobVertex("source2");
JobVertex target = new JobVertex("target");
- target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
- target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL);
+ target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
jg.addVertex(source1);
jg.addVertex(source2);
@@ -84,11 +85,11 @@ public class JobGraphTest {
JobVertex intermediate1 = new JobVertex("intermediate1");
JobVertex intermediate2 = new JobVertex("intermediate2");
- target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
- target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
- target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE);
- intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE);
- intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+ target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ intermediate2.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ intermediate1.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph graph = new JobGraph("TestGraph",
source1, source2, intermediate1, intermediate2, target1, target2);
@@ -121,19 +122,19 @@ public class JobGraphTest {
JobVertex l13 = new JobVertex("layer 1 - 3");
JobVertex l2 = new JobVertex("layer 2");
- root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE);
- root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
- root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE);
+ root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
- l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE);
- l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE);
+ l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
- l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
+ l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
- l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE);
- l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+ l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
- l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE);
+ l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph graph = new JobGraph("TestGraph",
source1, source2, root, l11, l13, l12, l2);
@@ -177,10 +178,10 @@ public class JobGraphTest {
JobVertex op2 = new JobVertex("op2");
JobVertex op3 = new JobVertex("op3");
- op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
- op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE);
- op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
- op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE);
+ op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ op3.connectNewDataSetAsInput(op2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
@@ -206,10 +207,10 @@ public class JobGraphTest {
JobVertex v3 = new JobVertex("3");
JobVertex v4 = new JobVertex("4");
- v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
- v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+ v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4);
try {
@@ -236,12 +237,12 @@ public class JobGraphTest {
JobVertex v4 = new JobVertex("4");
JobVertex target = new JobVertex("target");
- v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
- v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE);
- v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE);
- v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
- target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE);
+ v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ target.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
JobGraph jg = new JobGraph("Cyclic Graph", v1, v2, v3, v4, source, target);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index 48f06b0..d94b93e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.junit.Test;
@@ -41,7 +42,7 @@ public class JobTaskVertexTest {
public void testConnectDirectly() {
JobVertex source = new JobVertex("source");
JobVertex target = new JobVertex("target");
- target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+ target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
assertTrue(source.isInputVertex());
assertFalse(source.isOutputVertex());
@@ -62,7 +63,7 @@ public class JobTaskVertexTest {
JobVertex source = new JobVertex("source");
JobVertex target1= new JobVertex("target1");
JobVertex target2 = new JobVertex("target2");
- target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE);
+ target1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
target2.connectDataSetAsInput(source.getProducedDataSets().get(0), DistributionPattern.ALL_TO_ALL);
assertTrue(source.isInputVertex());
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index d1d5f03..62b9b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -64,8 +64,8 @@ public class JsonGeneratorTest {
join2.connectNewDataSetAsInput(join1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
join2.connectNewDataSetAsInput(source3, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
- sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE);
- sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL);
+ sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+ sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
JobGraph jg = new JobGraph("my job", source1, source2, source3,
intermediate1, intermediate2, join1, join2, sink1, sink2);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index fe33022..49d0239 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -129,7 +130,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
sender.setParallelism(parallelism);
receiver.setParallelism(parallelism);
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sender.setSlotSharingGroup(slotSharingGroup);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 19cc444..7ae9974 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -272,7 +273,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
sender.setParallelism(parallelism);
receiver.setParallelism(parallelism);
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sender.setSlotSharingGroup(slotSharingGroup);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 876e908..5a14b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -91,7 +92,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
JobVertex consumer = new JobVertex("AsyncConsumer");
consumer.setParallelism(1);
consumer.setInvokableClass(AsyncConsumer.class);
- consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE);
+ consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
producer.setSlotSharingGroup(slot);
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 12e2d63..d4b4cbf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -19,18 +19,18 @@
package org.apache.flink.runtime.jobmanager
import akka.actor.ActorSystem
-import akka.actor.Status.Success
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
@@ -60,7 +60,8 @@ class CoLocationConstraintITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
sender.setSlotSharingGroup(sharingGroup)
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 31e72dd..5374d01 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobSnapshottingSettings}
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
import org.apache.flink.runtime.jobmanager.Tasks._
@@ -180,7 +181,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
@@ -215,7 +217,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
@@ -251,8 +254,10 @@ class JobManagerITCase(_system: ActorSystem)
sender2.setParallelism(2 * num_tasks)
receiver.setParallelism(3 * num_tasks)
- receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
- receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
@@ -296,8 +301,10 @@ class JobManagerITCase(_system: ActorSystem)
sender2.setParallelism(2 * num_tasks)
receiver.setParallelism(3 * num_tasks)
- receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
- receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
@@ -338,8 +345,10 @@ class JobManagerITCase(_system: ActorSystem)
forwarder.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)
- forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL)
- receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL)
+ forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED)
+ receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver)
@@ -375,7 +384,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
@@ -423,7 +433,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
@@ -468,7 +479,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
@@ -508,7 +520,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
@@ -556,7 +569,8 @@ class JobManagerITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index b96369f..f3ab409 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -18,22 +18,23 @@
package org.apache.flink.runtime.jobmanager
-import akka.actor.{PoisonPill, ActorSystem}
+import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import org.scalatest.junit.JUnitRunner
-import scala.concurrent.duration._
+import scala.concurrent.duration._
import language.postfixOps
@RunWith(classOf[JUnitRunner])
@@ -81,7 +82,8 @@ class RecoveryITCase(_system: ActorSystem)
sender.setParallelism(NUM_TASKS)
receiver.setParallelism(NUM_TASKS)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val executionConfig = new ExecutionConfig()
executionConfig.setNumberOfExecutionRetries(1);
@@ -125,7 +127,8 @@ class RecoveryITCase(_system: ActorSystem)
sender.setParallelism(NUM_TASKS)
receiver.setParallelism(NUM_TASKS)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val sharingGroup = new SlotSharingGroup
sender.setSlotSharingGroup(sharingGroup)
@@ -173,7 +176,8 @@ class RecoveryITCase(_system: ActorSystem)
sender.setParallelism(NUM_TASKS)
receiver.setParallelism(NUM_TASKS)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val sharingGroup = new SlotSharingGroup
sender.setSlotSharingGroup(sharingGroup)
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index f986e73..4fffd68 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -19,18 +19,18 @@
package org.apache.flink.runtime.jobmanager
import akka.actor.ActorSystem
-import akka.actor.Status.Success
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
-import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
+import org.apache.flink.runtime.jobmanager.Tasks.{AgnosticBinaryReceiver, Receiver, Sender}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
@@ -60,7 +60,8 @@ class SlotSharingITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
sender.setSlotSharingGroup(sharingGroup)
@@ -107,8 +108,10 @@ class SlotSharingITCase(_system: ActorSystem)
sender2.setSlotSharingGroup(sharingGroup)
receiver.setSlotSharingGroup(sharingGroup)
- receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
- receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
+ receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
+ receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED)
val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index d0136f0..9775d33 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.jobmanager
-import akka.actor.{Kill, ActorSystem, PoisonPill}
+import akka.actor.{ActorSystem, Kill, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType
+import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob}
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
@@ -61,7 +63,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val sharingGroup = new SlotSharingGroup()
sender.setSlotSharingGroup(sharingGroup)
@@ -110,7 +113,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
sender.setParallelism(num_tasks)
receiver.setParallelism(num_tasks)
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE)
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED)
val sharingGroup = new SlotSharingGroup()
sender.setSlotSharingGroup(sharingGroup)
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6e088f6..ec55f19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -51,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
try {
ioMan = new IOManagerAsync();
- BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
- BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, true);
+ BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
+ BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
new BufferPool[] { pool1, pool2 },
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 3d0e5ab..b38df61 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -129,11 +130,14 @@ public class NetworkStackThroughputITCase {
consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
if (useForwarder) {
- forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL);
- consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL);
+ forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED);
+ consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED);
}
else {
- consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL);
+ consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL,
+ ResultPartitionType.PIPELINED);
}
return jobGraph;
http://git-wip-us.apache.org/repos/asf/flink/blob/8b49ee5a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index eacdeb4..df4f370 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -165,7 +166,8 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
sender.setParallelism(parallelism);
receiver.setParallelism(parallelism);
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
+ ResultPartitionType.PIPELINED);
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sender.setSlotSharingGroup(slotSharingGroup);
[4/9] flink git commit: [hotfix] [checkpoints] Cleanups in
PendingCheckpoint
Posted by se...@apache.org.
[hotfix] [checkpoints] Cleanups in PendingCheckpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f0f7a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f0f7a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f0f7a7
Branch: refs/heads/master
Commit: 59f0f7a74b60a0d82ea28defb8a3c57f6cd85bea
Parents: 65ccf7c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 28 19:38:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 9 13:00:55 2017 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/PendingCheckpoint.java | 67 +++++++++++---------
1 file changed, 38 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59f0f7a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6c9dbaf..5ca6040 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -18,19 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -51,6 +38,19 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
@@ -78,18 +78,17 @@ public class PendingCheckpoint {
/** Set of acknowledged tasks */
private final Set<ExecutionAttemptID> acknowledgedTasks;
- /**
- * The checkpoint properties. If the checkpoint should be persisted
- * externally, it happens in {@link #finalizeCheckpointExternalized()}.
- */
+ /** The checkpoint properties. If the checkpoint should be persisted
+ * externally, it happens in {@link #finalizeCheckpointExternalized()}. */
private final CheckpointProperties props;
/** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */
private final String targetDirectory;
/** The promise to fulfill once the checkpoint has been completed. */
- private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>();
+ private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise;
+ /** The executor for potentially blocking I/O operations, like state disposal */
private final Executor executor;
private int numAcknowledgedTasks;
@@ -110,14 +109,6 @@ public class PendingCheckpoint {
CheckpointProperties props,
String targetDirectory,
Executor executor) {
- this.jobId = checkNotNull(jobId);
- this.checkpointId = checkpointId;
- this.checkpointTimestamp = checkpointTimestamp;
- this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
- this.taskStates = new HashMap<>();
- this.props = checkNotNull(props);
- this.targetDirectory = targetDirectory;
- this.executor = Preconditions.checkNotNull(executor);
// Sanity check
if (props.externalizeCheckpoint() && targetDirectory == null) {
@@ -127,7 +118,17 @@ public class PendingCheckpoint {
checkArgument(verticesToConfirm.size() > 0,
"Checkpoint needs at least one vertex that commits the checkpoint");
- acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
+ this.jobId = checkNotNull(jobId);
+ this.checkpointId = checkpointId;
+ this.checkpointTimestamp = checkpointTimestamp;
+ this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+ this.props = checkNotNull(props);
+ this.targetDirectory = targetDirectory;
+ this.executor = Preconditions.checkNotNull(executor);
+
+ this.taskStates = new HashMap<>();
+ this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
+ this.onCompletionPromise = new FlinkCompletableFuture<>();
}
// --------------------------------------------------------------------------------------------
@@ -193,7 +194,7 @@ public class PendingCheckpoint {
* @param trackerCallback Callback for collecting subtask stats.
*/
void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
- this.statsCallback = checkNotNull(trackerCallback);
+ this.statsCallback = trackerCallback;
}
// ------------------------------------------------------------------------
@@ -289,6 +290,8 @@ public class PendingCheckpoint {
onCompletionPromise.complete(completed);
+ // to prevent null-pointers from concurrent modification, copy reference onto stack
+ PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
@@ -342,7 +345,8 @@ public class PendingCheckpoint {
TaskState taskState = taskStates.get(jobVertexID);
if (null == taskState) {
- ChainedStateHandle<StreamStateHandle> nonPartitionedState =
+ @SuppressWarnings("deprecation")
+ ChainedStateHandle<StreamStateHandle> nonPartitionedState =
subtaskState.getLegacyOperatorState();
ChainedStateHandle<OperatorStateHandle> partitioneableState =
subtaskState.getManagedOperatorState();
@@ -371,6 +375,9 @@ public class PendingCheckpoint {
++numAcknowledgedTasks;
+ // publish the checkpoint statistics
+ // to prevent null-pointers from concurrent modification, copy reference onto stack
+ final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
@@ -493,6 +500,8 @@ public class PendingCheckpoint {
* @param cause The failure cause or <code>null</code>.
*/
private void reportFailedCheckpoint(Exception cause) {
+ // to prevent null-pointers from concurrent modification, copy reference onto stack
+ final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
long failureTimestamp = System.currentTimeMillis();
statsCallback.reportFailedCheckpoint(failureTimestamp, cause);