You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/11 21:32:36 UTC

[2/6] flink git commit: [FLINK-7661][network] Add credit field in PartitionRequest message

[FLINK-7661][network] Add credit field in PartitionRequest message

This closes #4698.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891f359d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891f359d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891f359d

Branch: refs/heads/master
Commit: 891f359d710146acf3d05cd2af3bb430a8fbc99b
Parents: eef0db0
Author: Zhijiang <wa...@aliyun.com>
Authored: Thu Sep 21 16:28:16 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:58 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/netty/NettyMessage.java     | 11 ++++++++---
 .../runtime/io/network/netty/PartitionRequestClient.java |  2 +-
 .../network/partition/consumer/RemoteInputChannel.java   |  4 ++++
 .../io/network/netty/CancelPartitionRequestTest.java     |  4 ++--
 .../io/network/netty/NettyMessageSerializationTest.java  |  3 ++-
 .../network/netty/ServerTransportErrorHandlingTest.java  |  2 +-
 6 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index d7ddfa6..c035010 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -358,10 +358,13 @@ abstract class NettyMessage {
 
 		final InputChannelID receiverId;
 
-		PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) {
+		final int credit;
+
+		PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId, int credit) {
 			this.partitionId = checkNotNull(partitionId);
 			this.queueIndex = queueIndex;
 			this.receiverId = checkNotNull(receiverId);
+			this.credit = credit;
 		}
 
 		@Override
@@ -369,12 +372,13 @@ abstract class NettyMessage {
 			ByteBuf result = null;
 
 			try {
-				result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16);
+				result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16 + 4);
 
 				partitionId.getPartitionId().writeTo(result);
 				partitionId.getProducerId().writeTo(result);
 				result.writeInt(queueIndex);
 				receiverId.writeTo(result);
+				result.writeInt(credit);
 
 				return result;
 			}
@@ -394,8 +398,9 @@ abstract class NettyMessage {
 					ExecutionAttemptID.fromByteBuf(buffer));
 			int queueIndex = buffer.readInt();
 			InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+			int credit = buffer.readInt();
 
-			return new PartitionRequest(partitionId, queueIndex, receiverId);
+			return new PartitionRequest(partitionId, queueIndex, receiverId, credit);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 7850974..8dbc6b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -106,7 +106,7 @@ public class PartitionRequestClient {
 		partitionRequestHandler.addInputChannel(inputChannel);
 
 		final PartitionRequest request = new PartitionRequest(
-				partitionId, subpartitionIndex, inputChannel.getInputChannelId());
+				partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
 
 		final ChannelFutureListener listener = new ChannelFutureListener() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4e1eaef..4c156df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -382,6 +382,10 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		return id;
 	}
 
+	public int getInitialCredit() {
+		return initialCredit;
+	}
+
 	public BufferProvider getBufferProvider() throws IOException {
 		if (isReleased.get()) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 12f5064..912fae2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -97,7 +97,7 @@ public class CancelPartitionRequestTest {
 			Channel ch = connect(serverAndClient);
 
 			// Request for non-existing input channel => results in cancel request
-			ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID())).await();
+			ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID(), 2)).await();
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
@@ -150,7 +150,7 @@ public class CancelPartitionRequestTest {
 			// Request for non-existing input channel => results in cancel request
 			InputChannelID inputChannelId = new InputChannelID();
 
-			ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId)).await();
+			ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId, 2)).await();
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 8200caa..0651f97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -126,12 +126,13 @@ public class NettyMessageSerializationTest {
 		}
 
 		{
-			NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID());
+			NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID(), random.nextInt());
 			NettyMessage.PartitionRequest actual = encodeAndDecode(expected);
 
 			assertEquals(expected.partitionId, actual.partitionId);
 			assertEquals(expected.queueIndex, actual.queueIndex);
 			assertEquals(expected.receiverId, actual.receiverId);
+			assertEquals(expected.credit, actual.credit);
 		}
 
 		{

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 01a0b5f..d365fba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -106,7 +106,7 @@ public class ServerTransportErrorHandlingTest {
 			Channel ch = connect(serverAndClient);
 
 			// Write something to trigger close by server
-			ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
+			ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID(), 2));
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {