You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/26 10:36:44 UTC

[2/2] flink git commit: [hotfix] Fix unnecessary stream wrapping for Netty Error Message Frames

[hotfix] Fix unnecessary stream wrapping for Netty Error Message Frames


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d46c9d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d46c9d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d46c9d3

Branch: refs/heads/master
Commit: 1d46c9d3623519e92b9cd7f147f15cbde527fb65
Parents: 1db14fc
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 23 18:56:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 26 12:36:12 2016 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 233 +------------------
 1 file changed, 10 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d46c9d3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2b03f1d..b97bd82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -28,10 +28,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.MessageToMessageDecoder;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -289,17 +286,9 @@ abstract class NettyMessage {
 
 		@Override
 		ByteBuf write(ByteBufAllocator allocator) throws IOException {
-			ByteBuf result = null;
-
-			ObjectOutputStream oos = null;
-
-			try {
-				result = allocateBuffer(allocator, ID);
-
-				DataOutputView outputView = new ByteBufDataOutputView(result);
-
-				oos = new ObjectOutputStream(new DataOutputViewStream(outputView));
+			final ByteBuf result = allocateBuffer(allocator, ID);
 
+			try (ObjectOutputStream oos = new ObjectOutputStream(new ByteBufOutputStream(result))) {
 				oos.writeObject(cause);
 
 				if (receiverId != null) {
@@ -311,30 +300,22 @@ abstract class NettyMessage {
 
 				// Update frame length...
 				result.setInt(0, result.readableBytes());
+				return result;
 			}
 			catch (Throwable t) {
-				if (result != null) {
-					result.release();
-				}
+				result.release();
 
-				throw new IOException(t);
-			} finally {
-				if(oos != null) {
-					oos.close();
+				if (t instanceof IOException) {
+					throw (IOException) t;
+				} else {
+					throw new IOException(t);
 				}
 			}
-
-			return result;
 		}
 
 		@Override
 		void readFrom(ByteBuf buffer) throws Exception {
-			DataInputView inputView = new ByteBufDataInputView(buffer);
-			ObjectInputStream ois = null;
-
-			try {
-				ois = new ObjectInputStream(new DataInputViewStream(inputView));
-
+			try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer))) {
 				Object obj = ois.readObject();
 
 				if (!(obj instanceof Throwable)) {
@@ -347,10 +328,6 @@ abstract class NettyMessage {
 						receiverId = InputChannelID.fromByteBuf(buffer);
 					}
 				}
-			} finally {
-				if (ois != null) {
-					ois.close();
-				}
 			}
 		}
 	}
@@ -540,194 +517,4 @@ abstract class NettyMessage {
 		void readFrom(ByteBuf buffer) throws Exception {
 		}
 	}
-
-	// ------------------------------------------------------------------------
-
-	private static class ByteBufDataInputView implements DataInputView {
-
-		private final ByteBufInputStream inputView;
-
-		public ByteBufDataInputView(ByteBuf buffer) {
-			this.inputView = new ByteBufInputStream(buffer);
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public int read(byte[] b, int off, int len) throws IOException {
-			return inputView.read(b, off, len);
-		}
-
-		@Override
-		public int read(byte[] b) throws IOException {
-			return inputView.read(b);
-		}
-
-		@Override
-		public void readFully(byte[] b) throws IOException {
-			inputView.readFully(b);
-		}
-
-		@Override
-		public void readFully(byte[] b, int off, int len) throws IOException {
-			inputView.readFully(b, off, len);
-		}
-
-		@Override
-		public int skipBytes(int n) throws IOException {
-			return inputView.skipBytes(n);
-		}
-
-		@Override
-		public boolean readBoolean() throws IOException {
-			return inputView.readBoolean();
-		}
-
-		@Override
-		public byte readByte() throws IOException {
-			return inputView.readByte();
-		}
-
-		@Override
-		public int readUnsignedByte() throws IOException {
-			return inputView.readUnsignedByte();
-		}
-
-		@Override
-		public short readShort() throws IOException {
-			return inputView.readShort();
-		}
-
-		@Override
-		public int readUnsignedShort() throws IOException {
-			return inputView.readUnsignedShort();
-		}
-
-		@Override
-		public char readChar() throws IOException {
-			return inputView.readChar();
-		}
-
-		@Override
-		public int readInt() throws IOException {
-			return inputView.readInt();
-		}
-
-		@Override
-		public long readLong() throws IOException {
-			return inputView.readLong();
-		}
-
-		@Override
-		public float readFloat() throws IOException {
-			return inputView.readFloat();
-		}
-
-		@Override
-		public double readDouble() throws IOException {
-			return inputView.readDouble();
-		}
-
-		@Override
-		public String readLine() throws IOException {
-			return inputView.readLine();
-		}
-
-		@Override
-		public String readUTF() throws IOException {
-			return inputView.readUTF();
-		}
-	}
-
-	private static class ByteBufDataOutputView implements DataOutputView {
-
-		private final ByteBufOutputStream outputView;
-
-		public ByteBufDataOutputView(ByteBuf buffer) {
-			this.outputView = new ByteBufOutputStream(buffer);
-		}
-
-		@Override
-		public void skipBytesToWrite(int numBytes) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void write(DataInputView source, int numBytes) throws IOException {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			outputView.write(b);
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			outputView.write(b);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			outputView.write(b, off, len);
-		}
-
-		@Override
-		public void writeBoolean(boolean v) throws IOException {
-			outputView.writeBoolean(v);
-		}
-
-		@Override
-		public void writeByte(int v) throws IOException {
-			outputView.writeByte(v);
-		}
-
-		@Override
-		public void writeShort(int v) throws IOException {
-			outputView.writeShort(v);
-		}
-
-		@Override
-		public void writeChar(int v) throws IOException {
-			outputView.writeChar(v);
-		}
-
-		@Override
-		public void writeInt(int v) throws IOException {
-			outputView.writeInt(v);
-		}
-
-		@Override
-		public void writeLong(long v) throws IOException {
-			outputView.writeLong(v);
-		}
-
-		@Override
-		public void writeFloat(float v) throws IOException {
-			outputView.writeFloat(v);
-		}
-
-		@Override
-		public void writeDouble(double v) throws IOException {
-			outputView.writeDouble(v);
-		}
-
-		@Override
-		public void writeBytes(String s) throws IOException {
-			outputView.writeBytes(s);
-		}
-
-		@Override
-		public void writeChars(String s) throws IOException {
-			outputView.writeChars(s);
-		}
-
-		@Override
-		public void writeUTF(String s) throws IOException {
-			outputView.writeUTF(s);
-		}
-	}
 }