You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/13 07:52:21 UTC

[6/6] flink git commit: [FLINK-3952][runtine] Upgrade to Netty 4.1

[FLINK-3952][runtine] Upgrade to Netty 4.1

This commit includes possible bug fix to file uploading cleanup in FileUploadHandler and
HttpRequestHandler. For mor information look here:

https://github.com/netty/netty/issues/7611

This closes #6071.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8169cf4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8169cf4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8169cf4e

Branch: refs/heads/master
Commit: 8169cf4ec6d4acb89d8e03fd2b7ac0b4c8c2f267
Parents: cc41285
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 16 21:27:22 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 13 09:51:44 2018 +0200

----------------------------------------------------------------------
 .../queryablestate/network/ChunkedByteBuf.java  |   20 +
 .../queryablestate/network/NettyBufferPool.java |    5 +
 .../runtime/webmonitor/HttpRequestHandler.java  |    2 -
 .../io/network/buffer/NetworkBuffer.java        |   70 +
 .../buffer/ReadOnlySlicedNetworkBuffer.java     |   20 +-
 .../flink/runtime/rest/AbstractHandler.java     |    2 +-
 .../flink/runtime/rest/FileUploadHandler.java   |    1 -
 .../apache/flink/runtime/rest/RestClient.java   |   11 +-
 .../rest/handler/router/RoutedRequest.java      |   19 +-
 .../rest/handler/router/RouterHandler.java      |    2 +-
 .../io/network/buffer/AbstractByteBufTest.java  | 2210 ++++++++++++++++--
 .../runtime/io/network/buffer/BufferTest.java   |    4 +-
 .../buffer/ReadOnlySlicedBufferTest.java        |   12 +-
 .../netty/NettyMessageSerializationTest.java    |    8 +-
 .../AbstractTaskManagerFileHandlerTest.java     |    8 +-
 .../LocalFlinkMiniClusterITCase.java            |    6 +-
 pom.xml                                         |   10 +-
 17 files changed, 2165 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
index 9c56025..60b5799 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
@@ -73,6 +74,15 @@ public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
 
 	@Override
 	public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
+		return readChunk();
+	}
+
+	@Override
+	public ByteBuf readChunk(ByteBufAllocator byteBufAllocator) throws Exception {
+		return readChunk();
+	}
+
+	private ByteBuf readChunk() {
 		if (isClosed) {
 			return null;
 		} else if (buf.readableBytes() <= chunkSize) {
@@ -89,6 +99,16 @@ public class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
 	}
 
 	@Override
+	public long length() {
+		return -1;
+	}
+
+	@Override
+	public long progress() {
+		return buf.readerIndex();
+	}
+
+	@Override
 	public String toString() {
 		return "ChunkedByteBuf{" +
 				"buf=" + buf +

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
index 5e014b8..f475b2a 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java
@@ -168,4 +168,9 @@ public class NettyBufferPool implements ByteBufAllocator {
 	public boolean isDirectBufferPooled() {
 		return alloc.isDirectBufferPooled();
 	}
+
+	@Override
+	public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
+		return alloc.calculateNewCapacity(minNewCapacity, maxCapacity);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
index a0fda9d..1d4f047 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
@@ -146,8 +146,6 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject>
 								currentRequest.setUri(encoder.toString());
 							}
 						}
-
-						data.release();
 					}
 				}
 				catch (EndOfDataDecoderException ignored) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index deb0f4d..489be39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -31,6 +31,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 
@@ -182,22 +183,43 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
+	protected short _getShortLE(int index) {
+		return memorySegment.getShortLittleEndian(index);
+	}
+
+	@Override
 	protected int _getUnsignedMedium(int index) {
 		// from UnpooledDirectByteBuf:
 		return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff;
 	}
 
 	@Override
+	protected int _getUnsignedMediumLE(int index) {
+		// from UnpooledDirectByteBuf:
+		return getByte(index) & 255 | (getByte(index + 1) & 255) << 8 | (getByte(index + 2) & 255) << 16;
+	}
+
+	@Override
 	protected int _getInt(int index) {
 		return memorySegment.getIntBigEndian(index);
 	}
 
 	@Override
+	protected int _getIntLE(int index) {
+		return memorySegment.getIntLittleEndian(index);
+	}
+
+	@Override
 	protected long _getLong(int index) {
 		return memorySegment.getLongBigEndian(index);
 	}
 
 	@Override
+	protected long _getLongLE(int index) {
+		return memorySegment.getLongLittleEndian(index);
+	}
+
+	@Override
 	protected void _setByte(int index, int value) {
 		memorySegment.put(index, (byte) value);
 	}
@@ -208,6 +230,11 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
+	protected void _setShortLE(int index, int value) {
+		memorySegment.putShortLittleEndian(index, (short) value);
+	}
+
+	@Override
 	protected void _setMedium(int index, int value) {
 		// from UnpooledDirectByteBuf:
 		setByte(index, (byte) (value >>> 16));
@@ -216,16 +243,34 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
+	protected void _setMediumLE(int index, int value){
+		// from UnpooledDirectByteBuf:
+		setByte(index, (byte) value);
+		setByte(index + 1, (byte) (value >>> 8));
+		setByte(index + 2, (byte) (value >>> 16));
+	}
+
+	@Override
 	protected void _setInt(int index, int value) {
 		memorySegment.putIntBigEndian(index, value);
 	}
 
 	@Override
+	protected void _setIntLE(int index, int value) {
+		memorySegment.putIntLittleEndian(index, value);
+	}
+
+	@Override
 	protected void _setLong(int index, long value) {
 		memorySegment.putLongBigEndian(index, value);
 	}
 
 	@Override
+	protected void _setLongLE(int index, long value) {
+		memorySegment.putLongLittleEndian(index, value);
+	}
+
+	@Override
 	public int capacity() {
 		return currentSize;
 	}
@@ -357,6 +402,18 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
+	public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, length);
+		if (length == 0) {
+			return 0;
+		}
+
+		ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+		return out.write(tmpBuf, position);
+	}
+
+	@Override
 	public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
 		// from UnpooledDirectByteBuf:
 		checkSrcIndex(index, length, srcIndex, src.capacity());
@@ -425,6 +482,19 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
+	public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, length);
+
+		ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+		try {
+			return in.read(tmpBuf, position);
+		} catch (ClosedChannelException ignored) {
+			return -1;
+		}
+	}
+
+	@Override
 	public ByteBufAllocator alloc() {
 		return checkNotNull(allocator);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index 52fb57a..00e1154 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -75,12 +75,12 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
 
 	@Override
 	public ByteBuf unwrap() {
-		return super.unwrap().unwrap();
+		return super.unwrap();
 	}
 
 	@Override
 	public boolean isBuffer() {
-		return ((Buffer) unwrap()).isBuffer();
+		return getBuffer().isBuffer();
 	}
 
 	@Override
@@ -98,7 +98,7 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
 	 */
 	@Override
 	public MemorySegment getMemorySegment() {
-		return ((Buffer) unwrap()).getMemorySegment();
+		return getBuffer().getMemorySegment();
 	}
 
 	@Override
@@ -108,22 +108,22 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
 
 	@Override
 	public BufferRecycler getRecycler() {
-		return ((Buffer) unwrap()).getRecycler();
+		return getBuffer().getRecycler();
 	}
 
 	@Override
 	public void recycleBuffer() {
-		((Buffer) unwrap()).recycleBuffer();
+		getBuffer().recycleBuffer();
 	}
 
 	@Override
 	public boolean isRecycled() {
-		return ((Buffer) unwrap()).isRecycled();
+		return getBuffer().isRecycled();
 	}
 
 	@Override
 	public ReadOnlySlicedNetworkBuffer retainBuffer() {
-		((Buffer) unwrap()).retainBuffer();
+		getBuffer().retainBuffer();
 		return this;
 	}
 
@@ -203,11 +203,15 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement
 
 	@Override
 	public void setAllocator(ByteBufAllocator allocator) {
-		((Buffer) unwrap()).setAllocator(allocator);
+		getBuffer().setAllocator(allocator);
 	}
 
 	@Override
 	public ByteBuf asByteBuf() {
 		return this;
 	}
+
+	private Buffer getBuffer() {
+		return ((Buffer) unwrap().unwrap());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index e785def..7246b7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -86,7 +86,7 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
 	protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
 		HttpRequest httpRequest = routedRequest.getRequest();
 		if (log.isTraceEnabled()) {
-			log.trace("Received request " + httpRequest.getUri() + '.');
+			log.trace("Received request " + httpRequest.uri() + '.');
 		}
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 8854a1f..2f38e65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -100,7 +100,6 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 					fileUpload.renameTo(dest.toFile());
 					ctx.channel().attr(UPLOADED_FILE).set(dest);
 				}
-				data.release();
 			}
 
 			if (httpContent instanceof LastHttpContent) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2e812d3..a63bf5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -75,6 +75,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
+
 /**
  * This client is the counter-part to the {@link RestServerEndpoint}.
  */
@@ -247,7 +249,14 @@ public class RestClient {
 
 		@Override
 		protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
-			if (msg instanceof FullHttpResponse) {
+			if (msg instanceof HttpResponse && ((HttpResponse) msg).status().equals(REQUEST_ENTITY_TOO_LARGE)) {
+				jsonFuture.completeExceptionally(
+					new RestClientException(
+						String.format(
+							REQUEST_ENTITY_TOO_LARGE + ". Try to raise [%s]",
+							RestOptions.CLIENT_MAX_CONTENT_LENGTH.key()),
+						((HttpResponse) msg).status()));
+			} else if (msg instanceof FullHttpResponse) {
 				readRawResponse((FullHttpResponse) msg);
 			} else {
 				LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
index 96f5a3e..5c3f077 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.router;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
 import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCounted;
 
 import java.util.Optional;
@@ -40,7 +41,7 @@ public class RoutedRequest<T> implements ReferenceCounted {
 		this.result = checkNotNull(result);
 		this.request = checkNotNull(request);
 		this.requestAsReferenceCounted = Optional.ofNullable((request instanceof ReferenceCounted) ? (ReferenceCounted) request : null);
-		this.queryStringDecoder = new QueryStringDecoder(request.getUri());
+		this.queryStringDecoder = new QueryStringDecoder(request.uri());
 	}
 
 	public RouteResult<T> getRouteResult() {
@@ -94,4 +95,20 @@ public class RoutedRequest<T> implements ReferenceCounted {
 		}
 		return this;
 	}
+
+	@Override
+	public ReferenceCounted touch() {
+		if (requestAsReferenceCounted.isPresent()) {
+			ReferenceCountUtil.touch(requestAsReferenceCounted.get());
+		}
+		return this;
+	}
+
+	@Override
+	public ReferenceCounted touch(Object hint) {
+		if (requestAsReferenceCounted.isPresent()) {
+			ReferenceCountUtil.touch(requestAsReferenceCounted.get(), hint);
+		}
+		return this;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8169cf4e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
index 2e8200e..cdfd169 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RouterHandler.java
@@ -73,7 +73,7 @@ public class RouterHandler extends SimpleChannelInboundHandler<HttpRequest> {
 
 		// Route
 		HttpMethod method = httpRequest.getMethod();
-		QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.getUri());
+		QueryStringDecoder qsd = new QueryStringDecoder(httpRequest.uri());
 		RouteResult<?> routeResult = router.route(method, qsd.path(), qsd.parameters());
 
 		if (routeResult == null) {