You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/03/15 15:00:46 UTC
flink git commit: [FLINK-6044] Replace all unintentional calls to
InputStream#read(...) with InputStream#readFully(...)
Repository: flink
Updated Branches:
refs/heads/master 0bdc8bfd0 -> cbaf4e5e4
[FLINK-6044] Replace all unintentional calls to InputStream#read(...) with InputStream#readFully(...)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cbaf4e5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cbaf4e5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cbaf4e5e
Branch: refs/heads/master
Commit: cbaf4e5e4c862e489ac93ef8f7300e3cdeac86f3
Parents: 0bdc8bf
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Mar 14 14:45:00 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed Mar 15 15:59:48 2017 +0100
----------------------------------------------------------------------
.../typeutils/TypeSerializerSerializationProxy.java | 2 +-
.../api/common/typeutils/base/BigIntSerializer.java | 7 ++++---
.../core/memory/DataOutputViewStreamWrapper.java | 2 +-
.../src/main/java/org/apache/flink/types/Record.java | 14 +++++++-------
.../typeutils/runtime/TestDataOutputSerializer.java | 10 +++++-----
.../typeutils/runtime/kryo/KryoClearedBufferTest.java | 5 +----
.../flink/runtime/util/DataOutputSerializer.java | 2 +-
.../io/network/api/writer/RecordWriterTest.java | 2 +-
.../apache/flink/streaming/runtime/io/TestEvent.java | 2 +-
9 files changed, 22 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
index cebd348..cb8967b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -94,7 +94,7 @@ public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWrit
// read in a way that allows the stream to recover from exceptions
int serializerBytes = in.readInt();
byte[] buffer = new byte[serializerBytes];
- in.read(buffer);
+ in.readFully(buffer);
try {
typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
} catch (ClassNotFoundException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
index 73b2f54..041165d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
@@ -18,12 +18,13 @@
package org.apache.flink.api.common.typeutils.base;
-import java.io.IOException;
-import java.math.BigInteger;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import java.io.IOException;
+import java.math.BigInteger;
+
/**
* Serializer for serializing/deserializing BigInteger values including null values.
*/
@@ -130,7 +131,7 @@ public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger>
}
}
final byte[] bytes = new byte[len - 4];
- source.read(bytes);
+ source.readFully(bytes);
return new BigInteger(bytes);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
index 9ec9c29..4e45532 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
@@ -57,7 +57,7 @@ public class DataOutputViewStreamWrapper extends DataOutputStream implements Dat
while (numBytes > 0) {
int toCopy = Math.min(numBytes, tempBuffer.length);
- source.read(tempBuffer, 0, toCopy);
+ source.readFully(tempBuffer, 0, toCopy);
write(tempBuffer, 0, toCopy);
numBytes -= toCopy;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 9990ddf..c296751 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -19,6 +19,12 @@
package org.apache.flink.types;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.InstantiationUtil;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
@@ -27,12 +33,6 @@ import java.io.Serializable;
import java.io.UTFDataFormatException;
import java.nio.ByteOrder;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-import org.apache.flink.util.InstantiationUtil;
-
/**
* The Record represents a multi-valued data record.
@@ -1808,7 +1808,7 @@ public final class Record implements Value, CopyableValue<Record> {
throw new IOException("Could not write " + numBytes + " bytes since the buffer is full.");
}
- source.read(this.memory,this.position, numBytes);
+ source.readFully(this.memory,this.position, numBytes);
this.position += numBytes;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
index 87be6db..d830a21 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
@@ -18,16 +18,16 @@
package org.apache.flink.api.java.typeutils.runtime;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
public final class TestDataOutputSerializer implements DataOutputView {
private byte[] buffer;
@@ -301,7 +301,7 @@ public final class TestDataOutputSerializer implements DataOutputView {
throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
}
- source.read(this.buffer, this.position, numBytes);
+ source.readFully(this.buffer, this.position, numBytes);
this.position += numBytes;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index d85ff95..8b420bc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -22,13 +22,11 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
-
import org.junit.Assert;
import org.junit.Test;
@@ -36,7 +34,6 @@ import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
public class KryoClearedBufferTest {
@@ -178,7 +175,7 @@ public class KryoClearedBufferTest {
byte[] tempBuffer = new byte[numBytes];
- source.read(tempBuffer);
+ source.readFully(tempBuffer);
System.arraycopy(tempBuffer, 0, buffer, position, numBytes);
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 18940ed..4f1cf77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -324,7 +324,7 @@ public class DataOutputSerializer implements DataOutputView {
throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
}
- source.read(this.buffer, this.position, numBytes);
+ source.readFully(this.buffer, this.position, numBytes);
this.position += numBytes;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 7d83fb5..98d4f65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -542,7 +542,7 @@ public class RecordWriterTest {
@Override
public void read(DataInputView in) throws IOException {
- in.read(bytes);
+ in.readFully(bytes);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cbaf4e5e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
index 286477a..9fcb7fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
@@ -58,7 +58,7 @@ public class TestEvent extends AbstractEvent {
public void read(DataInputView in) throws IOException {
this.magicNumber = in.readLong();
this.payload = new byte[in.readInt()];
- in.read(this.payload);
+ in.readFully(this.payload);
}
// ------------------------------------------------------------------------