You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/08/10 15:43:09 UTC

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4517

    [FLINK-7411][network] minor performance improvements in NettyMessage

    ## What is the purpose of the change
    
    This PR adds some (minor) performance improvements to `NettyMessage` which I came across: using a `switch` rather than multiple `if...ifelse...ifelse...` avoiding unneeded virtual method calls.
    
    ## Brief change log
    
    - use a switch rather than multiple if conditions
    - use static `readFrom` methods to create instances of the message sub types
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `NettyMessageSerializationTest`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes, if you consider network communication part of this)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-7411

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4517
    
----
commit b33e036b43d00ddf564f105280894f6287dd3e92
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-07T15:38:36Z

    [FLINK-7411][network] minor (performance) improvements in NettyMessage
    
    * use a switch rather than multiple if conditions
    * use static `readFrom` methods to create instances of the message sub types

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4517


---

[GitHub] flink issue #4517: [FLINK-7411][network] minor performance improvements in N...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4517
  
    how about now?


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r141345178
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -242,21 +239,22 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
     				throw new IOException(t);
     			}
     			finally {
    -				if (buffer != null) {
    -					buffer.recycle();
    -				}
    +				buffer.recycle();
     			}
     		}
     
    -		@Override
    -		void readFrom(ByteBuf buffer) {
    -			receiverId = InputChannelID.fromByteBuf(buffer);
    -			sequenceNumber = buffer.readInt();
    -			isBuffer = buffer.readBoolean();
    -			size = buffer.readInt();
    -
    -			retainedSlice = buffer.readSlice(size);
    -			retainedSlice.retain();
    +		static BufferResponse readFrom(ByteBuf buffer) {
    +			BufferResponse result = new BufferResponse();
    --- End diff --
    
    I agree we could change that, too - and partly I did in another commit that didn't go into a PR yet. Let's do this for all classes then.


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r136836067
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -220,6 +216,7 @@ void releaseBuffer() {
     
     		@Override
     		ByteBuf write(ByteBufAllocator allocator) throws IOException {
    +			assert buffer != null; // see BufferResponse()
    --- End diff --
    
    `checkState`


---

[GitHub] flink issue #4517: [FLINK-7411][network] minor performance improvements in N...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4517
  
    Thanks for your contribution @NicoK and the review @pnowojski. Changes look good to me. Will wait for Travis and then merge this PR.


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r136827653
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -130,34 +128,31 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
     
     			byte msgId = msg.readByte();
     
    -			NettyMessage decodedMsg = null;
    -
    -			if (msgId == BufferResponse.ID) {
    -				decodedMsg = new BufferResponse();
    -			}
    -			else if (msgId == PartitionRequest.ID) {
    -				decodedMsg = new PartitionRequest();
    -			}
    -			else if (msgId == TaskEventRequest.ID) {
    -				decodedMsg = new TaskEventRequest();
    -			}
    -			else if (msgId == ErrorResponse.ID) {
    -				decodedMsg = new ErrorResponse();
    -			}
    -			else if (msgId == CancelPartitionRequest.ID) {
    -				decodedMsg = new CancelPartitionRequest();
    -			}
    -			else if (msgId == CloseRequest.ID) {
    -				decodedMsg = new CloseRequest();
    -			}
    -			else {
    -				throw new IllegalStateException("Received unknown message from producer: " + msg);
    +			final NettyMessage decodedMsg;
    +			switch (msgId) {
    +				case BufferResponse.ID:
    +					decodedMsg = BufferResponse.readFrom(msg);
    +					break;
    +				case PartitionRequest.ID:
    +					decodedMsg = PartitionRequest.readFrom(msg);
    +					break;
    +				case TaskEventRequest.ID:
    +					decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
    +					break;
    +				case ErrorResponse.ID:
    +					decodedMsg = ErrorResponse.readFrom(msg);
    +					break;
    +				case CancelPartitionRequest.ID:
    +					decodedMsg = CancelPartitionRequest.readFrom(msg);
    +					break;
    +				case CloseRequest.ID:
    +					decodedMsg = CloseRequest.readFrom(msg);
    +					break;
    +				default:
    +					throw new IllegalStateException("Received unknown message from producer: " + msg);
    --- End diff --
    
    nit: I would change the exception to `UnsupportedOperationException`, which is more natural fit hear. Backward compatibility is not something that we should worry about with `IllegalStateException` :)


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r141341300
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -130,34 +128,31 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
     
     			byte msgId = msg.readByte();
     
    -			NettyMessage decodedMsg = null;
    -
    -			if (msgId == BufferResponse.ID) {
    -				decodedMsg = new BufferResponse();
    -			}
    -			else if (msgId == PartitionRequest.ID) {
    -				decodedMsg = new PartitionRequest();
    -			}
    -			else if (msgId == TaskEventRequest.ID) {
    -				decodedMsg = new TaskEventRequest();
    -			}
    -			else if (msgId == ErrorResponse.ID) {
    -				decodedMsg = new ErrorResponse();
    -			}
    -			else if (msgId == CancelPartitionRequest.ID) {
    -				decodedMsg = new CancelPartitionRequest();
    -			}
    -			else if (msgId == CloseRequest.ID) {
    -				decodedMsg = new CloseRequest();
    -			}
    -			else {
    -				throw new IllegalStateException("Received unknown message from producer: " + msg);
    +			final NettyMessage decodedMsg;
    +			switch (msgId) {
    +				case BufferResponse.ID:
    +					decodedMsg = BufferResponse.readFrom(msg);
    +					break;
    +				case PartitionRequest.ID:
    +					decodedMsg = PartitionRequest.readFrom(msg);
    +					break;
    +				case TaskEventRequest.ID:
    +					decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
    +					break;
    +				case ErrorResponse.ID:
    +					decodedMsg = ErrorResponse.readFrom(msg);
    +					break;
    +				case CancelPartitionRequest.ID:
    +					decodedMsg = CancelPartitionRequest.readFrom(msg);
    +					break;
    +				case CloseRequest.ID:
    +					decodedMsg = CloseRequest.readFrom(msg);
    +					break;
    +				default:
    +					throw new IllegalStateException("Received unknown message from producer: " + msg);
    --- End diff --
    
    I agree that `IllegalStateException` is not the best fit here, but also, `UnsupportedOperationException` is not - what do you think about `ProtocolException`?


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r136831143
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -378,11 +373,14 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
     			}
     		}
     
    -		@Override
    -		public void readFrom(ByteBuf buffer) {
    -			partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
    -			queueIndex = buffer.readInt();
    -			receiverId = InputChannelID.fromByteBuf(buffer);
    +		static PartitionRequest readFrom(ByteBuf buffer) {
    +			PartitionRequest result = new PartitionRequest();
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r136831169
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -439,20 +437,23 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
     			}
     		}
     
    -		@Override
    -		public void readFrom(ByteBuf buffer) throws IOException {
    +		static TaskEventRequest readFrom(ByteBuf buffer, ClassLoader classLoader) throws IOException {
    +			TaskEventRequest result = new TaskEventRequest();
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r136830945
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -242,21 +239,22 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
     				throw new IOException(t);
     			}
     			finally {
    -				if (buffer != null) {
    -					buffer.recycle();
    -				}
    +				buffer.recycle();
     			}
     		}
     
    -		@Override
    -		void readFrom(ByteBuf buffer) {
    -			receiverId = InputChannelID.fromByteBuf(buffer);
    -			sequenceNumber = buffer.readInt();
    -			isBuffer = buffer.readBoolean();
    -			size = buffer.readInt();
    -
    -			retainedSlice = buffer.readSlice(size);
    -			retainedSlice.retain();
    +		static BufferResponse readFrom(ByteBuf buffer) {
    +			BufferResponse result = new BufferResponse();
    --- End diff --
    
    this is somehow strange construct, with this default empty constructor and manually modifying fields afterwards. Why not:
    ```
    InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
    int sequenceNumber = buffer.readInt();
    boolean isBuffer = buffer.readBoolean();
    int size = buffer.readInt();
    
    return new BufferResponse(
      receivedId,
      sequenceNumber,
      isBuffer,
      size,
      buffer.readSlice(size).retain());
    ```
    
    would be more natural, would be easier for the finding usages of the fields and that way you could make all of the fields `final`, which is nice feature on its own.


---

[GitHub] flink pull request #4517: [FLINK-7411][network] minor performance improvemen...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r136831126
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---
    @@ -313,20 +308,20 @@ ByteBuf write(ByteBufAllocator allocator) throws IOException {
     			}
     		}
     
    -		@Override
    -		void readFrom(ByteBuf buffer) throws Exception {
    +		static ErrorResponse readFrom(ByteBuf buffer) throws Exception {
     			try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer))) {
     				Object obj = ois.readObject();
     
     				if (!(obj instanceof Throwable)) {
     					throw new ClassCastException("Read object expected to be of type Throwable, " +
     							"actual type is " + obj.getClass() + ".");
     				} else {
    -					cause = (Throwable) obj;
    +					ErrorResponse result = new ErrorResponse((Throwable) obj);
     
     					if (buffer.readBoolean()) {
    -						receiverId = InputChannelID.fromByteBuf(buffer);
    +						result.receiverId = InputChannelID.fromByteBuf(buffer);
    --- End diff --
    
    ditto


---