You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/26 10:36:44 UTC
[2/2] flink git commit: [hotfix] Fix unnecessary stream wrapping for
Netty Error Message Frames
[hotfix] Fix unnecessary stream wrapping for Netty Error Message Frames
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d46c9d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d46c9d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d46c9d3
Branch: refs/heads/master
Commit: 1d46c9d3623519e92b9cd7f147f15cbde527fb65
Parents: 1db14fc
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 23 18:56:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 26 12:36:12 2016 +0200
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyMessage.java | 233 +------------------
1 file changed, 10 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1d46c9d3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2b03f1d..b97bd82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -28,10 +28,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -289,17 +286,9 @@ abstract class NettyMessage {
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
- ByteBuf result = null;
-
- ObjectOutputStream oos = null;
-
- try {
- result = allocateBuffer(allocator, ID);
-
- DataOutputView outputView = new ByteBufDataOutputView(result);
-
- oos = new ObjectOutputStream(new DataOutputViewStream(outputView));
+ final ByteBuf result = allocateBuffer(allocator, ID);
+ try (ObjectOutputStream oos = new ObjectOutputStream(new ByteBufOutputStream(result))) {
oos.writeObject(cause);
if (receiverId != null) {
@@ -311,30 +300,22 @@ abstract class NettyMessage {
// Update frame length...
result.setInt(0, result.readableBytes());
+ return result;
}
catch (Throwable t) {
- if (result != null) {
- result.release();
- }
+ result.release();
- throw new IOException(t);
- } finally {
- if(oos != null) {
- oos.close();
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else {
+ throw new IOException(t);
}
}
-
- return result;
}
@Override
void readFrom(ByteBuf buffer) throws Exception {
- DataInputView inputView = new ByteBufDataInputView(buffer);
- ObjectInputStream ois = null;
-
- try {
- ois = new ObjectInputStream(new DataInputViewStream(inputView));
-
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer))) {
Object obj = ois.readObject();
if (!(obj instanceof Throwable)) {
@@ -347,10 +328,6 @@ abstract class NettyMessage {
receiverId = InputChannelID.fromByteBuf(buffer);
}
}
- } finally {
- if (ois != null) {
- ois.close();
- }
}
}
}
@@ -540,194 +517,4 @@ abstract class NettyMessage {
void readFrom(ByteBuf buffer) throws Exception {
}
}
-
- // ------------------------------------------------------------------------
-
- private static class ByteBufDataInputView implements DataInputView {
-
- private final ByteBufInputStream inputView;
-
- public ByteBufDataInputView(ByteBuf buffer) {
- this.inputView = new ByteBufInputStream(buffer);
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return inputView.read(b, off, len);
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return inputView.read(b);
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- inputView.readFully(b);
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- inputView.readFully(b, off, len);
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- return inputView.skipBytes(n);
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- return inputView.readBoolean();
- }
-
- @Override
- public byte readByte() throws IOException {
- return inputView.readByte();
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- return inputView.readUnsignedByte();
- }
-
- @Override
- public short readShort() throws IOException {
- return inputView.readShort();
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- return inputView.readUnsignedShort();
- }
-
- @Override
- public char readChar() throws IOException {
- return inputView.readChar();
- }
-
- @Override
- public int readInt() throws IOException {
- return inputView.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return inputView.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return inputView.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return inputView.readDouble();
- }
-
- @Override
- public String readLine() throws IOException {
- return inputView.readLine();
- }
-
- @Override
- public String readUTF() throws IOException {
- return inputView.readUTF();
- }
- }
-
- private static class ByteBufDataOutputView implements DataOutputView {
-
- private final ByteBufOutputStream outputView;
-
- public ByteBufDataOutputView(ByteBuf buffer) {
- this.outputView = new ByteBufOutputStream(buffer);
- }
-
- @Override
- public void skipBytesToWrite(int numBytes) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void write(DataInputView source, int numBytes) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void write(int b) throws IOException {
- outputView.write(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- outputView.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- outputView.write(b, off, len);
- }
-
- @Override
- public void writeBoolean(boolean v) throws IOException {
- outputView.writeBoolean(v);
- }
-
- @Override
- public void writeByte(int v) throws IOException {
- outputView.writeByte(v);
- }
-
- @Override
- public void writeShort(int v) throws IOException {
- outputView.writeShort(v);
- }
-
- @Override
- public void writeChar(int v) throws IOException {
- outputView.writeChar(v);
- }
-
- @Override
- public void writeInt(int v) throws IOException {
- outputView.writeInt(v);
- }
-
- @Override
- public void writeLong(long v) throws IOException {
- outputView.writeLong(v);
- }
-
- @Override
- public void writeFloat(float v) throws IOException {
- outputView.writeFloat(v);
- }
-
- @Override
- public void writeDouble(double v) throws IOException {
- outputView.writeDouble(v);
- }
-
- @Override
- public void writeBytes(String s) throws IOException {
- outputView.writeBytes(s);
- }
-
- @Override
- public void writeChars(String s) throws IOException {
- outputView.writeChars(s);
- }
-
- @Override
- public void writeUTF(String s) throws IOException {
- outputView.writeUTF(s);
- }
- }
}