You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/14 18:09:36 UTC
flink git commit: [FLINK-3760] Fix StateDescriptor.readObject
Repository: flink
Updated Branches:
refs/heads/master 1a34f2165 -> 494212b37
[FLINK-3760] Fix StateDescriptor.readObject
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/494212b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/494212b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/494212b3
Branch: refs/heads/master
Commit: 494212b37b64b45cd777cc09fc6d3d3d8fbf5999
Parents: 1a34f21
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 14 16:10:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 14 18:09:14 2016 +0200
----------------------------------------------------------------------
.../flink/api/common/state/StateDescriptor.java | 34 +++++++++-----------
.../common/state/ValueStateDescriptorTest.java | 29 +++++++++++++++++
2 files changed, 45 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/494212b3/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 10ac5ba..243ebcd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -61,12 +61,12 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
/** The type information describing the value type. Only used to lazily create the serializer
* and dropped during serialization */
private transient TypeInformation<T> typeInfo;
-
+
// ------------------------------------------------------------------------
-
+
/**
* Create a new {@code StateDescriptor} with the given name and the given type serializer.
- *
+ *
* @param name The name of the {@code StateDescriptor}.
* @param serializer The type serializer for the values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
@@ -94,7 +94,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
- *
+ *
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
*
@@ -106,7 +106,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
protected StateDescriptor(String name, Class<T> type, T defaultValue) {
this.name = requireNonNull(name, "name must not be null");
requireNonNull(type, "type class must not be null");
-
+
try {
this.typeInfo = TypeExtractor.createTypeInfo(type);
} catch (Exception e) {
@@ -117,7 +117,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
}
// ------------------------------------------------------------------------
-
+
/**
* Returns the name of this {@code StateDescriptor}.
*/
@@ -152,21 +152,21 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
throw new IllegalStateException("Serializer not yet initialized.");
}
}
-
+
/**
* Creates a new {@link State} on the given {@link StateBackend}.
*
* @param stateBackend The {@code StateBackend} on which to create the {@link State}.
*/
public abstract S bind(StateBackend stateBackend) throws Exception;
-
+
// ------------------------------------------------------------------------
/**
* Checks whether the serializer has been initialized. Serializer initialization is lazy,
* to allow parametrization of serializers with an {@link ExecutionConfig} via
* {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
- *
+ *
* @return True if the serializers have been initialized, false otherwise.
*/
public boolean isSerializerInitialized() {
@@ -175,7 +175,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
/**
* Initializes the serializer, unless it has been initialized before.
- *
+ *
* @param executionConfig The execution config to use when creating the serializer.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
@@ -188,7 +188,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
}
}
}
-
+
/**
* This method should be called by subclasses prior to serialization. Because the TypeInformation is
* not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
@@ -204,7 +204,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
}
}
}
-
+
// ------------------------------------------------------------------------
// Standard Utils
// ------------------------------------------------------------------------
@@ -230,7 +230,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
@Override
public String toString() {
- return getClass().getSimpleName() +
+ return getClass().getSimpleName() +
"{name=" + name +
", defaultValue=" + defaultValue +
", serializer=" + serializer +
@@ -257,7 +257,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
out.writeBoolean(true);
byte[] serializedDefaultValue;
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
{
TypeSerializer<T> duplicateSerializer = serializer.duplicate();
@@ -284,12 +284,10 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
boolean hasDefaultValue = in.readBoolean();
if (hasDefaultValue) {
int size = in.readInt();
+
byte[] buffer = new byte[size];
- int bytesRead = in.read(buffer);
- if (bytesRead != size) {
- throw new RuntimeException("Read size does not match expected size.");
- }
+ in.readFully(buffer);
try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
http://git-wip-us.apache.org/repos/asf/flink/blob/494212b3/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index d03cc47..655ffd5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -101,4 +101,33 @@ public class ValueStateDescriptorTest {
assertNotNull(copy.getSerializer());
assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
}
+
+ @Test
+ public void testVeryLargeDefaultValue() throws Exception {
+ // ensure that we correctly read very large data when deserializing the default value
+
+ TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+ byte[] data = new byte[200000];
+ for (int i = 0; i < 200000; i++) {
+ data[i] = 65;
+ }
+ data[199000] = '\0';
+
+ String defaultValue = new String(data);
+
+ ValueStateDescriptor<String> descr =
+ new ValueStateDescriptor<String>("testName", serializer, defaultValue);
+
+ assertEquals("testName", descr.getName());
+ assertEquals(defaultValue, descr.getDefaultValue());
+ assertNotNull(descr.getSerializer());
+ assertEquals(serializer, descr.getSerializer());
+
+ ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+ assertEquals("testName", copy.getName());
+ assertEquals(defaultValue, copy.getDefaultValue());
+ assertNotNull(copy.getSerializer());
+ assertEquals(serializer, copy.getSerializer());
+ }
}