You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/06/18 06:49:04 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #8471: [FLINK-12529][runtime] Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager

pnowojski commented on a change in pull request #8471: [FLINK-12529][runtime] Release record-deserializer buffers timely to improve the efficiency of heap usage on taskmanager
URL: https://github.com/apache/flink/pull/8471#discussion_r294630598
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 ##########
 @@ -307,21 +307,35 @@ private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOExceptio
 			if (event.getClass() != EndOfPartitionEvent.class) {
 				throw new IOException("Unexpected event: " + event);
 			}
+
+			// release the record deserializer immediately,
+			// which is very valuable in case of bounded stream
+			releaseDeserializer(bufferOrEvent.getChannelIndex());
 		}
 	}
 
 	public void cleanup() throws IOException {
-		// clear the buffers first. this part should not ever fail
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+		// release the deserializers first. this part should not ever fail
+		for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) {
+			releaseDeserializer(channelIndex);
+		}
+
+		// cleanup the barrier handler resources
+		barrierHandler.cleanup();
+	}
+
+	private void releaseDeserializer(int channelIndex) {
+		// recycle buffers and clear the deserializer.
+		RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
+		if (deserializer != null) {
 
 Review comment:
   Instead of introducing another `@Nullable` field, can not we just always clear deserializer? To me it looks like `releaseDeserializer` (with `deserializer.getCurrentBuffer()` + `deserializer.clear()`) should be already an idempotent method - if yes could you
   
   1. just do not set `recordDeserializers[channelIndex] = null;` 
   2. rename `releaseDeserializer` to `clearDeserializer`
   
   + the same for `StreamTaskNetworkInput` class.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services