You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/13 07:52:21 UTC
[6/6] flink git commit: [FLINK-3952][runtine] Upgrade to Netty 4.1
[FLINK-3952][runtine] Upgrade to Netty 4.1
This commit includes possible bug fix to file uploading cleanup in FileUploadHandler and
HttpRequestHandler. For mor information look here:
https://github.com/netty/netty/issues/7611
This closes #6071.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8169cf4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8169cf4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8169cf4e
Branch: refs/heads/master
Commit: 8169cf4ec6d4acb89d8e03fd2b7ac0b4c8c2f267
Parents: cc41285
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 16 21:27:22 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 13 09:51:44 2018 +0200
----------------------------------------------------------------------
.../queryablestate/network/ChunkedByteBuf.java | 20 +
.../queryablestate/network/NettyBufferPool.java | 5 +
.../runtime/webmonitor/HttpRequestHandler.java | 2 -
.../io/network/buffer/NetworkBuffer.java | 70 +
.../buffer/ReadOnlySlicedNetworkBuffer.java | 20 +-
.../flink/runtime/rest/AbstractHandler.java | 2 +-
.../flink/runtime/rest/FileUploadHandler.java | 1 -
.../apache/flink/runtime/rest/RestClient.java | 11 +-
.../rest/handler/router/RoutedRequest.java | 19 +-
.../rest/handler/router/RouterHandler.java | 2 +-
.../io/network/buffer/AbstractByteBufTest.java | 2210 ++++++++++++++++--
.../runtime/io/network/buffer/BufferTest.java | 4 +-
.../buffer/ReadOnlySlicedBufferTest.java | 12 +-
.../netty/NettyMessageSerializationTest.java | 8 +-
.../AbstractTaskManagerFileHandlerTest.java | 8 +-
.../LocalFlinkMiniClusterITCase.java | 6 +-
pom.xml | 10 +-
17 files changed, 2165 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
index 9c56025..60b5799 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
@@ -73,6 +74,15 @@ public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+ return readChunk();
+ }
+
+ @Override
+ public ByteBuf readChunk(ByteBufAllocator byteBufAllocator) throws Exception {
+ return readChunk();
+ }
+
+ private ByteBuf readChunk() {
if (isClosed) {
return null;
} else if (buf.readableBytes() <= chunkSize) {
@@ -89,6 +99,16 @@ public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
}
@Override
+ public long length() {
+ return -1;
+ }
+
+ @Override
+ public long progress() {
+ return buf.readerIndex();
+ }
+
+ @Override
public String toString() {
return "ChunkedByteBuf{" +
"buf=" + buf +
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
index 5e014b8..f475b2a 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
@@ -168,4 +168,9 @@ public class NettyBufferPool implements ByteBufAllocator {
public boolean isDirectBufferPooled() {
return alloc.isDirectBufferPooled();
}
+
+ @Override
+ public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
+ return alloc.calculateNewCapacity(minNewCapacity, maxCapacity);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index a0fda9d..1d4f047 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -146,8 +146,6 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject>
currentRequest.setUri(encoder.toString());
}
}
-
- data.release();
}
}
catch (EndOfDataDecoderException ignored) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index deb0f4d..489be39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -31,6 +31,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
@@ -182,22 +183,43 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
+ protected short _getShortLE(int index) {
+ return memorySegment.getShortLittleEndian(index);
+ }
+
+ @Override
protected int _getUnsignedMedium(int index) {
// from UnpooledDirectByteBuf:
return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff;
}
@Override
+ protected int _getUnsignedMediumLE(int index) {
+ // from UnpooledDirectByteBuf:
+ return getByte(index) & 255 | (getByte(index + 1) & 255) << 8 | (getByte(index + 2) & 255) << 16;
+ }
+
+ @Override
protected int _getInt(int index) {
return memorySegment.getIntBigEndian(index);
}
@Override
+ protected int _getIntLE(int index) {
+ return memorySegment.getIntLittleEndian(index);
+ }
+
+ @Override
protected long _getLong(int index) {
return memorySegment.getLongBigEndian(index);
}
@Override
+ protected long _getLongLE(int index) {
+ return memorySegment.getLongLittleEndian(index);
+ }
+
+ @Override
protected void _setByte(int index, int value) {
memorySegment.put(index, (byte) value);
}
@@ -208,6 +230,11 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
+ protected void _setShortLE(int index, int value) {
+ memorySegment.putShortLittleEndian(index, (short) value);
+ }
+
+ @Override
protected void _setMedium(int index, int value) {
// from UnpooledDirectByteBuf:
setByte(index, (byte) (value >>> 16));
@@ -216,16 +243,34 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
+ protected void _setMediumLE(int index, int value){
+ // from UnpooledDirectByteBuf:
+ setByte(index, (byte) value);
+ setByte(index + 1, (byte) (value >>> 8));
+ setByte(index + 2, (byte) (value >>> 16));
+ }
+
+ @Override
protected void _setInt(int index, int value) {
memorySegment.putIntBigEndian(index, value);
}
@Override
+ protected void _setIntLE(int index, int value) {
+ memorySegment.putIntLittleEndian(index, value);
+ }
+
+ @Override
protected void _setLong(int index, long value) {
memorySegment.putLongBigEndian(index, value);
}
@Override
+ protected void _setLongLE(int index, long value) {
+ memorySegment.putLongLittleEndian(index, value);
+ }
+
+ @Override
public int capacity() {
return currentSize;
}
@@ -357,6 +402,18 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+ // adapted from UnpooledDirectByteBuf:
+ checkIndex(index, length);
+ if (length == 0) {
+ return 0;
+ }
+
+ ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+ return out.write(tmpBuf, position);
+ }
+
+ @Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
// from UnpooledDirectByteBuf:
checkSrcIndex(index, length, srcIndex, src.capacity());
@@ -425,6 +482,19 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+ // adapted from UnpooledDirectByteBuf:
+ checkIndex(index, length);
+
+ ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+ try {
+ return in.read(tmpBuf, position);
+ } catch (ClosedChannelException ignored) {
+ return -1;
+ }
+ }
+
+ @Override
public ByteBufAllocator alloc() {
return checkNotNull(allocator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index 52fb57a..00e1154 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -75,12 +75,12 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
@Override
public ByteBuf unwrap() {
- return super.unwrap().unwrap();
+ return super.unwrap();
}
@Override
public boolean isBuffer() {
- return ((Buffer) unwrap()).isBuffer();
+ return getBuffer().isBuffer();
}
@Override
@@ -98,7 +98,7 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
*/
@Override
public MemorySegment getMemorySegment() {
- return ((Buffer) unwrap()).getMemorySegment();
+ return getBuffer().getMemorySegment();
}
@Override
@@ -108,22 +108,22 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
@Override
public BufferRecycler getRecycler() {
- return ((Buffer) unwrap()).getRecycler();
+ return getBuffer().getRecycler();
}
@Override
public void recycleBuffer() {
- ((Buffer) unwrap()).recycleBuffer();
+ getBuffer().recycleBuffer();
}
@Override
public boolean isRecycled() {
- return ((Buffer) unwrap()).isRecycled();
+ return getBuffer().isRecycled();
}
@Override
public ReadOnlySlicedNetworkBuffer retainBuffer() {
- ((Buffer) unwrap()).retainBuffer();
+ getBuffer().retainBuffer();
return this;
}
@@ -203,11 +203,15 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
@Override
public void setAllocator(ByteBufAllocator allocator) {
- ((Buffer) unwrap()).setAllocator(allocator);
+ getBuffer().setAllocator(allocator);
}
@Override
public ByteBuf asByteBuf() {
return this;
}
+
+ private Buffer getBuffer() {
+ return ((Buffer) unwrap().unwrap());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index e785def..7246b7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -86,7 +86,7 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
HttpRequest httpRequest = routedRequest.getRequest();
if (log.isTraceEnabled()) {
- log.trace("Received request " + httpRequest.getUri() + '.');
+ log.trace("Received request " + httpRequest.uri() + '.');
}
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 8854a1f..2f38e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -100,7 +100,6 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
fileUpload.renameTo(dest.toFile());
ctx.channel().attr(UPLOADED_FILE).set(dest);
}
- data.release();
}
if (httpContent instanceof LastHttpContent) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e812d3..a63bf5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -75,6 +75,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
+
/**
* This client is the counter-part to the {@link RestServerEndpoint}.
*/
@@ -247,7 +249,14 @@ public class RestClient {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof FullHttpResponse) {
+ if (msg instanceof HttpResponse && ((HttpResponse) msg).status().equals(REQUEST_ENTITY_TOO_LARGE)) {
+ jsonFuture.completeExceptionally(
+ new RestClientException(
+ String.format(
+ REQUEST_ENTITY_TOO_LARGE + ". Try to raise [%s]",
+ RestOptions.CLIENT_MAX_CONTENT_LENGTH.key()),
+ ((HttpResponse) msg).status()));
+ } else if (msg instanceof FullHttpResponse) {
readRawResponse((FullHttpResponse) msg);
} else {
LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
index 96f5a3e..5c3f077 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.router;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCounted;
import java.util.Optional;
@@ -40,7 +41,7 @@ public class RoutedRequest<T> implements ReferenceCounted {
this.result = checkNotNull(result);
this.request = checkNotNull(request);
this.requestAsReferenceCounted = Optional.ofNullable((request instanceof ReferenceCounted) ? (ReferenceCounted) request : null);
- this.queryStringDecoder = new QueryStringDecoder(request.getUri());
+ this.queryStringDecoder = new QueryStringDecoder(request.uri());
}
public RouteResult<T> getRouteResult() {
@@ -94,4 +95,20 @@ public class RoutedRequest<T> implements ReferenceCounted {
}
return this;
}
+
+ @Override
+ public ReferenceCounted touch() {
+ if (requestAsReferenceCounted.isPresent()) {
+ ReferenceCountUtil.touch(requestAsReferenceCounted.get());
+ }
+ return this;
+ }
+
+ @Override
+ public ReferenceCounted touch(Object hint) {
+ if (requestAsReferenceCounted.isPresent()) {
+ ReferenceCountUtil.touch(requestAsReferenceCounted.get(), hint);
+ }
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
index 2e8200e..cdfd169 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
@@ -73,7 +73,7 @@ public class RouterHandler extends SimpleChannelInboundHandler<HttpRequest> {
// Route
HttpMethod method = httpRequest.getMethod();
- QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.getUri());
+ QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.uri());
RouteResult<?> routeResult = router.route(method, qsd.path(), qsd.parameters());
if (routeResult == null) {