You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/15 15:00:46 UTC

flink git commit: [FLINK-6044] Replace all unintentional calls to InputStream#read(...) with InputStream#readFully(...)

Repository: flink
Updated Branches:
  refs/heads/master 0bdc8bfd0 -> cbaf4e5e4


[FLINK-6044] Replace all unintentional calls to InputStream#read(...) with InputStream#readFully(...)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cbaf4e5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cbaf4e5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cbaf4e5e

Branch: refs/heads/master
Commit: cbaf4e5e4c862e489ac93ef8f7300e3cdeac86f3
Parents: 0bdc8bf
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 14 14:45:00 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed Mar 15 15:59:48 2017 +0100

----------------------------------------------------------------------
 .../typeutils/TypeSerializerSerializationProxy.java   |  2 +-
 .../api/common/typeutils/base/BigIntSerializer.java   |  7 ++++---
 .../core/memory/DataOutputViewStreamWrapper.java      |  2 +-
 .../src/main/java/org/apache/flink/types/Record.java  | 14 +++++++-------
 .../typeutils/runtime/TestDataOutputSerializer.java   | 10 +++++-----
 .../typeutils/runtime/kryo/KryoClearedBufferTest.java |  5 +----
 .../flink/runtime/util/DataOutputSerializer.java      |  2 +-
 .../io/network/api/writer/RecordWriterTest.java       |  2 +-
 .../apache/flink/streaming/runtime/io/TestEvent.java  |  2 +-
 9 files changed, 22 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
index cebd348..cb8967b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -94,7 +94,7 @@ public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWrit
 		// read in a way that allows the stream to recover from exceptions
 		int serializerBytes = in.readInt();
 		byte[] buffer = new byte[serializerBytes];
-		in.read(buffer);
+		in.readFully(buffer);
 		try {
 			typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
 		} catch (ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
index 73b2f54..041165d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import java.io.IOException;
-import java.math.BigInteger;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.IOException;
+import java.math.BigInteger;
+
 /**
  * Serializer for serializing/deserializing BigInteger values including null values.
  */
@@ -130,7 +131,7 @@ public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger>
 			}
 		}
 		final byte[] bytes = new byte[len - 4];
-		source.read(bytes);
+		source.readFully(bytes);
 		return new BigInteger(bytes);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
index 9ec9c29..4e45532 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
@@ -57,7 +57,7 @@ public class DataOutputViewStreamWrapper extends DataOutputStream implements Dat
 		
 		while (numBytes > 0) {
 			int toCopy = Math.min(numBytes, tempBuffer.length);
-			source.read(tempBuffer, 0, toCopy);
+			source.readFully(tempBuffer, 0, toCopy);
 			write(tempBuffer, 0, toCopy);
 			numBytes -= toCopy;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 9990ddf..c296751 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -19,6 +19,12 @@
 
 package org.apache.flink.types;
 
+import org.apache.flink.annotation.Public;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.InstantiationUtil;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
@@ -27,12 +33,6 @@ import java.io.Serializable;
 import java.io.UTFDataFormatException;
 import java.nio.ByteOrder;
 
-import org.apache.flink.annotation.Public;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.util.InstantiationUtil;
-
 
 /**
  * The Record represents a multi-valued data record.
@@ -1808,7 +1808,7 @@ public final class Record implements Value, CopyableValue<Record> {
 				throw new IOException("Could not write " + numBytes + " bytes since the buffer is full.");
 			}
 
-			source.read(this.memory,this.position, numBytes);
+			source.readFully(this.memory,this.position, numBytes);
 			this.position += numBytes;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
index 87be6db..d830a21 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
 public final class TestDataOutputSerializer implements DataOutputView {
 	
 	private byte[] buffer;
@@ -301,7 +301,7 @@ public final class TestDataOutputSerializer implements DataOutputView {
 			throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
 		}
 
-		source.read(this.buffer, this.position, numBytes);
+		source.readFully(this.buffer, this.position, numBytes);
 		this.position += numBytes;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index d85ff95..8b420bc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -22,13 +22,11 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,7 +34,6 @@ import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 public class KryoClearedBufferTest {
@@ -178,7 +175,7 @@ public class KryoClearedBufferTest {
 
 			byte[] tempBuffer = new byte[numBytes];
 
-			source.read(tempBuffer);
+			source.readFully(tempBuffer);
 
 			System.arraycopy(tempBuffer, 0, buffer, position, numBytes);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 18940ed..4f1cf77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -324,7 +324,7 @@ public class DataOutputSerializer implements DataOutputView {
 			throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
 		}
 
-		source.read(this.buffer, this.position, numBytes);
+		source.readFully(this.buffer, this.position, numBytes);
 		this.position += numBytes;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 7d83fb5..98d4f65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -542,7 +542,7 @@ public class RecordWriterTest {
 
 		@Override
 		public void read(DataInputView in) throws IOException {
-			in.read(bytes);
+			in.readFully(bytes);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
index 286477a..9fcb7fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
@@ -58,7 +58,7 @@ public class TestEvent extends AbstractEvent {
 	public void read(DataInputView in) throws IOException {
 		this.magicNumber = in.readLong();
 		this.payload = new byte[in.readInt()];
-		in.read(this.payload);
+		in.readFully(this.payload);
 	}
 
 	// ------------------------------------------------------------------------