You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/14 18:09:36 UTC

flink git commit: [FLINK-3760] Fix StateDescriptor.readObject

Repository: flink
Updated Branches:
  refs/heads/master 1a34f2165 -> 494212b37


[FLINK-3760] Fix StateDescriptor.readObject


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/494212b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/494212b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/494212b3

Branch: refs/heads/master
Commit: 494212b37b64b45cd777cc09fc6d3d3d8fbf5999
Parents: 1a34f21
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 14 16:10:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 14 18:09:14 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java | 34 +++++++++-----------
 .../common/state/ValueStateDescriptorTest.java  | 29 +++++++++++++++++
 2 files changed, 45 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/494212b3/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 10ac5ba..243ebcd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -61,12 +61,12 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	/** The type information describing the value type. Only used to lazily create the serializer
 	 * and dropped during serialization */
 	private transient TypeInformation<T> typeInfo;
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Create a new {@code StateDescriptor} with the given name and the given type serializer.
-	 * 
+	 *
 	 * @param name The name of the {@code StateDescriptor}.
 	 * @param serializer The type serializer for the values in the state.
 	 * @param defaultValue The default value that will be set when requesting state without setting
@@ -94,7 +94,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 
 	/**
 	 * Create a new {@code StateDescriptor} with the given name and the given type information.
-	 * 
+	 *
 	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
 	 * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
 	 *
@@ -106,7 +106,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	protected StateDescriptor(String name, Class<T> type, T defaultValue) {
 		this.name = requireNonNull(name, "name must not be null");
 		requireNonNull(type, "type class must not be null");
-		
+
 		try {
 			this.typeInfo = TypeExtractor.createTypeInfo(type);
 		} catch (Exception e) {
@@ -117,7 +117,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the name of this {@code StateDescriptor}.
 	 */
@@ -152,21 +152,21 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 			throw new IllegalStateException("Serializer not yet initialized.");
 		}
 	}
-	
+
 	/**
 	 * Creates a new {@link State} on the given {@link StateBackend}.
 	 *
 	 * @param stateBackend The {@code StateBackend} on which to create the {@link State}.
 	 */
 	public abstract S bind(StateBackend stateBackend) throws Exception;
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Checks whether the serializer has been initialized. Serializer initialization is lazy,
 	 * to allow parametrization of serializers with an {@link ExecutionConfig} via
 	 * {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
-	 * 
+	 *
 	 * @return True if the serializers have been initialized, false otherwise.
 	 */
 	public boolean isSerializerInitialized() {
@@ -175,7 +175,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 
 	/**
 	 * Initializes the serializer, unless it has been initialized before.
-	 * 
+	 *
 	 * @param executionConfig The execution config to use when creating the serializer.
 	 */
 	public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
@@ -188,7 +188,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 			}
 		}
 	}
-	
+
 	/**
 	 * This method should be called by subclasses prior to serialization. Because the TypeInformation is
 	 * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
@@ -204,7 +204,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Standard Utils
 	// ------------------------------------------------------------------------
@@ -230,7 +230,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 
 	@Override
 	public String toString() {
-		return getClass().getSimpleName() + 
+		return getClass().getSimpleName() +
 				"{name=" + name +
 				", defaultValue=" + defaultValue +
 				", serializer=" + serializer +
@@ -257,7 +257,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 			out.writeBoolean(true);
 
 			byte[] serializedDefaultValue;
-			try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
+			try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
 					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
 			{
 				TypeSerializer<T> duplicateSerializer = serializer.duplicate();
@@ -284,12 +284,10 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 		boolean hasDefaultValue = in.readBoolean();
 		if (hasDefaultValue) {
 			int size = in.readInt();
+
 			byte[] buffer = new byte[size];
-			int bytesRead = in.read(buffer);
 
-			if (bytesRead != size) {
-				throw new RuntimeException("Read size does not match expected size.");
-			}
+			in.readFully(buffer);
 
 			try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
 					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))

http://git-wip-us.apache.org/repos/asf/flink/blob/494212b3/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index d03cc47..655ffd5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -101,4 +101,33 @@ public class ValueStateDescriptorTest {
 		assertNotNull(copy.getSerializer());
 		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
 	}
+
+	@Test
+	public void testVeryLargeDefaultValue() throws Exception {
+		// ensure that we correctly read very large data when deserializing the default value
+
+		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+		byte[] data = new byte[200000];
+		for (int i = 0; i < 200000; i++) {
+			data[i] = 65;
+		}
+		data[199000] = '\0';
+
+		String defaultValue = new String(data);
+
+		ValueStateDescriptor<String> descr =
+				new ValueStateDescriptor<String>("testName", serializer, defaultValue);
+
+		assertEquals("testName", descr.getName());
+		assertEquals(defaultValue, descr.getDefaultValue());
+		assertNotNull(descr.getSerializer());
+		assertEquals(serializer, descr.getSerializer());
+
+		ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertEquals(defaultValue, copy.getDefaultValue());
+		assertNotNull(copy.getSerializer());
+		assertEquals(serializer, copy.getSerializer());
+	}
 }