You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/07 06:26:31 UTC

[1/8] flink git commit: [FLINK-9169] [runtime] Always use dummy serializer instead of null when old state serializer cannot be read

Repository: flink
Updated Branches:
  refs/heads/master a763d3342 -> 7dbc8ac6a


[FLINK-9169] [runtime] Always use dummy serializer instead of null when old state serializer cannot be read

Prreviously, the behaviour of TypeSerializerSerializationUtil read
methods in the case when serializers cannot be read, is quite mixed up.
For some exceptions (e.g. ClassNotFoundException,
InvalidClassException), a dummy serializer will be used as a
replacement. In other cases, null is used.

This commit fixes this by always using dummy serializers if a
'useDummyPlaceholder' flag is set to true. Otherwise, an IOException is
thrown. This makes it clear that users should use dummy serializers if
they want the deserialization to be tolerant to failures.

Another benefit of this is that there will no longer be 'null'
serializers after restore; they will either be an actual serializer, or
a dummy if the old serializer cannot be restored.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c541a1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c541a1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c541a1f

Branch: refs/heads/master
Commit: 7c541a1fa82546c29616a83897d6f5e347fb5840
Parents: a763d33
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed May 2 13:37:22 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 09:38:29 2018 +0800

----------------------------------------------------------------------
 .../TypeSerializerSerializationUtil.java        | 58 ++++++-------
 .../UnloadableTypeSerializerException.java      | 51 +++++++++++
 .../java/typeutils/runtime/PojoSerializer.java  |  6 +-
 .../TypeSerializerSerializationUtilTest.java    | 56 ++++++------
 .../typeutils/runtime/PojoSerializerTest.java   | 41 ++++-----
 ...tificialCNFExceptionThrowingClassLoader.java | 42 +++++++++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |  4 +-
 .../operators/testutils/DummyEnvironment.java   | 12 ++-
 .../runtime/state/MemoryStateBackendTest.java   | 39 ++++-----
 .../runtime/state/OperatorStateBackendTest.java | 24 ++----
 .../runtime/state/SerializationProxiesTest.java | 90 ++++++++++----------
 .../runtime/state/StateBackendTestBase.java     |  1 +
 12 files changed, 247 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 3f82530..e83b8c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -69,8 +69,7 @@ public class TypeSerializerSerializationUtil {
 	 * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}.
 	 *
 	 * <p>If deserialization fails for any reason (corrupted serializer bytes, serializer class
-	 * no longer in classpath, serializer class no longer valid, etc.), {@code null} will
-	 * be returned instead.
+	 * no longer in classpath, serializer class no longer valid, etc.), an {@link IOException} is thrown.
 	 *
 	 * @param in the data input view.
 	 * @param userCodeClassLoader the user code class loader to use.
@@ -79,7 +78,7 @@ public class TypeSerializerSerializationUtil {
 	 *
 	 * @return the deserialized serializer.
 	 */
-	public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader) {
+	public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
 		return tryReadSerializer(in, userCodeClassLoader, false);
 	}
 
@@ -87,10 +86,8 @@ public class TypeSerializerSerializationUtil {
 	 * Reads from a data input view a {@link TypeSerializer} that was previously
 	 * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}.
 	 *
-	 * <p>If deserialization fails due to {@link ClassNotFoundException} or {@link InvalidClassException},
-	 * users can opt to use a dummy {@link UnloadableDummyTypeSerializer} to hold the serializer bytes,
-	 * otherwise {@code null} is returned. If the failure is due to a {@link java.io.StreamCorruptedException},
-	 * then {@code null} is returned.
+	 * <p>If deserialization fails due to any exception, users can opt to use a dummy
+	 * {@link UnloadableDummyTypeSerializer} to hold the serializer bytes, otherwise an {@link IOException} is thrown.
 	 *
 	 * @param in the data input view.
 	 * @param userCodeClassLoader the user code class loader to use.
@@ -102,17 +99,24 @@ public class TypeSerializerSerializationUtil {
 	 *
 	 * @return the deserialized serializer.
 	 */
-	public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
+	public static <T> TypeSerializer<T> tryReadSerializer(
+			DataInputView in,
+			ClassLoader userCodeClassLoader,
+			boolean useDummyPlaceholder) throws IOException {
+
 		final TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<T> proxy =
-			new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader, useDummyPlaceholder);
+			new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
 		try {
 			proxy.read(in);
 			return proxy.getTypeSerializer();
-		} catch (IOException e) {
-			LOG.warn("Deserialization of serializer errored; replacing with null.", e);
-
-			return null;
+		} catch (UnloadableTypeSerializerException e) {
+			if (useDummyPlaceholder) {
+				LOG.warn("Could not read a requested serializer. Replaced with a UnloadableDummyTypeSerializer.", e.getCause());
+				return new UnloadableDummyTypeSerializer<>(e.getSerializerBytes());
+			} else {
+				throw e;
+			}
 		}
 	}
 
@@ -161,8 +165,9 @@ public class TypeSerializerSerializationUtil {
 	/**
 	 * Reads from a data input view a list of serializers and their corresponding config snapshots
 	 * written using {@link #writeSerializersAndConfigsWithResilience(DataOutputView, List)}.
-	 * This is fault tolerant to any failures when deserializing the serializers. Serializers which
-	 * were not successfully deserialized will be replaced by {@code null}.
+	 *
+	 * <p>If deserialization for serializers fails due to any exception, users can opt to use a dummy
+	 * {@link UnloadableDummyTypeSerializer} to hold the serializer bytes
 	 *
 	 * @param in the data input view.
 	 * @param userCodeClassLoader the user code class loader to use.
@@ -200,7 +205,7 @@ public class TypeSerializerSerializationUtil {
 			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
 
 				bufferWithPos.setPosition(offsets[i * 2]);
-				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
+				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
 
 				bufferWithPos.setPosition(offsets[i * 2 + 1]);
 				configSnapshot = readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader);
@@ -315,20 +320,13 @@ public class TypeSerializerSerializationUtil {
 
 		private ClassLoader userClassLoader;
 		private TypeSerializer<T> typeSerializer;
-		private boolean useDummyPlaceholder;
-
-		public TypeSerializerSerializationProxy(ClassLoader userClassLoader, boolean useDummyPlaceholder) {
-			this.userClassLoader = userClassLoader;
-			this.useDummyPlaceholder = useDummyPlaceholder;
-		}
 
 		public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
-			this(userClassLoader, false);
+			this.userClassLoader = userClassLoader;
 		}
 
 		public TypeSerializerSerializationProxy(TypeSerializer<T> typeSerializer) {
 			this.typeSerializer = Preconditions.checkNotNull(typeSerializer);
-			this.useDummyPlaceholder = false;
 		}
 
 		public TypeSerializer<T> getTypeSerializer() {
@@ -373,16 +371,8 @@ public class TypeSerializerSerializationUtil {
 
 				Thread.currentThread().setContextClassLoader(userClassLoader);
 				typeSerializer = (TypeSerializer<T>) ois.readObject();
-			} catch (ClassNotFoundException | InvalidClassException e) {
-				if (useDummyPlaceholder) {
-					// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
-					// a proper typeserializer from the user
-					typeSerializer =
-						new UnloadableDummyTypeSerializer<>(buffer);
-					LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
-				} else {
-					throw new IOException("Unloadable class for type serializer.", e);
-				}
+			} catch (Exception e) {
+				throw new UnloadableTypeSerializerException(e, buffer);
 			} finally {
 				Thread.currentThread().setContextClassLoader(previousClassLoader);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableTypeSerializerException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableTypeSerializerException.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableTypeSerializerException.java
new file mode 100644
index 0000000..467db6b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableTypeSerializerException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * An exception thrown to indicate that a serializer cannot be read.
+ * It wraps the cause of the read error, as well as the original bytes of the written serializer.
+ */
+@Internal
+public class UnloadableTypeSerializerException extends IOException {
+
+	private static final long serialVersionUID = 4500388174107930407L;
+
+	private final byte[] serializerBytes;
+
+	/**
+	 * Creates a new exception, with the cause of the read error and the original serializer bytes.
+	 *
+	 * @param cause the cause of the read error.
+	 * @param serializerBytes the original serializer bytes.
+	 */
+	public UnloadableTypeSerializerException(Exception cause, byte[] serializerBytes) {
+		super(cause);
+		this.serializerBytes = Preconditions.checkNotNull(serializerBytes);
+	}
+
+	public byte[] getSerializerBytes() {
+		return serializerBytes;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 6a67428..a4d086d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -924,7 +924,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					fieldName = inViewWrapper.readUTF();
 
 					inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
-					fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+					fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader(), true);
 
 					inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
 					fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
@@ -950,7 +950,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					}
 
 					inWithPos.setPosition(registeredSerializerOffsets[i * 2]);
-					registeredSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+					registeredSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader(), true);
 
 					inWithPos.setPosition(registeredSerializerOffsets[i * 2 + 1]);
 					registeredSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
@@ -976,7 +976,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 					}
 
 					inWithPos.setPosition(cachedSerializerOffsets[i * 2]);
-					cachedSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+					cachedSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader(), true);
 
 					inWithPos.setPosition(cachedSerializerOffsets[i * 2 + 1]);
 					cachedSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
index 10df619..0473931 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -27,15 +27,12 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -47,19 +44,17 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 
 /**
  * Unit tests for {@link TypeSerializerSerializationUtil}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class TypeSerializerSerializationUtilTest implements Serializable {
 
 	@ClassRule
@@ -107,13 +102,11 @@ public class TypeSerializerSerializationUtilTest implements Serializable {
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
-				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
-		}
-		Assert.assertEquals(null, deserializedSerializer);
-
-		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
-			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
-				new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null), true);
+				new DataInputViewStreamWrapper(in),
+				new ArtificialCNFExceptionThrowingClassLoader(
+					Thread.currentThread().getContextClassLoader(),
+					Collections.singleton(IntSerializer.class.getName())),
+				true);
 		}
 		Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
 
@@ -139,17 +132,15 @@ public class TypeSerializerSerializationUtilTest implements Serializable {
 
 		TypeSerializer<?> deserializedSerializer;
 
-		// mock failure when deserializing serializers
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
-
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+				new DataInputViewStreamWrapper(in),
+				new ArtificialCNFExceptionThrowingClassLoader(
+					Thread.currentThread().getContextClassLoader(),
+					Collections.singleton(IntSerializer.class.getName())),
+				true);
 		}
-		Assert.assertEquals(null, deserializedSerializer);
+		Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
 	}
 
 	/**
@@ -225,22 +216,23 @@ public class TypeSerializerSerializationUtilTest implements Serializable {
 			serializedSerializersAndConfigs = out.toByteArray();
 		}
 
-		// mock failure when deserializing serializers
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+		Set<String> cnfThrowingClassnames = new HashSet<>();
+		cnfThrowingClassnames.add(IntSerializer.class.getName());
+		cnfThrowingClassnames.add(DoubleSerializer.class.getName());
 
 		List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> restored;
 		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSerializersAndConfigs)) {
 			restored = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
-				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+				new DataInputViewStreamWrapper(in),
+				new ArtificialCNFExceptionThrowingClassLoader(
+					Thread.currentThread().getContextClassLoader(),
+					cnfThrowingClassnames));
 		}
 
 		Assert.assertEquals(2, restored.size());
-		Assert.assertEquals(null, restored.get(0).f0);
+		Assert.assertTrue(restored.get(0).f0 instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), restored.get(0).f1);
-		Assert.assertEquals(null, restored.get(1).f0);
+		Assert.assertTrue(restored.get(1).f0 instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index e5315aa..bdb3f8f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -20,16 +20,17 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -42,34 +43,32 @@ import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DateSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 
 /**
  * A test for the {@link PojoSerializer}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> {
 	private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class);
 
@@ -557,18 +556,22 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			serializedConfig = out.toByteArray();
 		}
 
-		// mock failure when deserializing serializers
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-			mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+		Set<String> cnfThrowingClassnames = new HashSet<>();
+		cnfThrowingClassnames.add(IntSerializer.class.getName());
+		cnfThrowingClassnames.add(IntPrimitiveArraySerializer.class.getName());
+		cnfThrowingClassnames.add(StringSerializer.class.getName());
+		cnfThrowingClassnames.add(DateSerializer.class.getName());
+		cnfThrowingClassnames.add(DoubleSerializer.class.getName());
 
 		// read configuration from bytes
 		PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> deserializedConfig;
 		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
 			deserializedConfig = (PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass>)
 				TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
-					new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+					new DataInputViewStreamWrapper(in),
+					new ArtificialCNFExceptionThrowingClassLoader(
+						Thread.currentThread().getContextClassLoader(),
+						cnfThrowingClassnames));
 		}
 
 		Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
@@ -584,7 +587,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
 				: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
-			Assert.assertEquals(null, entry.getValue().f0);
+			Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer);
 
 			if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
 				verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
@@ -601,7 +604,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
 				: deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
 
-			Assert.assertEquals(null, entry.getValue().f0);
+			Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer);
 
 			if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
 				verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-core/src/test/java/org/apache/flink/testutils/ArtificialCNFExceptionThrowingClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/ArtificialCNFExceptionThrowingClassLoader.java b/flink-core/src/test/java/org/apache/flink/testutils/ArtificialCNFExceptionThrowingClassLoader.java
new file mode 100644
index 0000000..0843785
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ArtificialCNFExceptionThrowingClassLoader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils;
+
+import java.util.Set;
+
+/**
+ * Utility classloader used in tests that allows simulating {@link ClassNotFoundException}s for specific classes.
+ */
+public class ArtificialCNFExceptionThrowingClassLoader extends ClassLoader {
+
+	private final Set<String> cnfThrowingClassnames;
+
+	public ArtificialCNFExceptionThrowingClassLoader(ClassLoader parent, Set<String> cnfThrowingClassnames) {
+		super(parent);
+		this.cnfThrowingClassnames = cnfThrowingClassnames;
+	}
+
+	@Override
+	protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+		if (cnfThrowingClassnames.contains(name)) {
+			throw new ClassNotFoundException();
+		} else {
+			return super.loadClass(name, resolve);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index d4244e0..f992341 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -170,8 +170,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
 			metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
 			metaInfo.setName(in.readUTF());
 
-			metaInfo.setNamespaceSerializer(TypeSerializerSerializationUtil.<N>tryReadSerializer(in, userCodeClassLoader));
-			metaInfo.setStateSerializer(TypeSerializerSerializationUtil.<S>tryReadSerializer(in, userCodeClassLoader));
+			metaInfo.setNamespaceSerializer(TypeSerializerSerializationUtil.<N>tryReadSerializer(in, userCodeClassLoader, true));
+			metaInfo.setStateSerializer(TypeSerializerSerializationUtil.<S>tryReadSerializer(in, userCodeClassLoader, true));
 
 			// older versions do not contain the configuration snapshot
 			metaInfo.setNamespaceSerializerConfigSnapshot(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 7d1a777..353bcad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -59,11 +59,17 @@ public class DummyEnvironment implements Environment {
 	private KvStateRegistry kvStateRegistry = new KvStateRegistry();
 	private TaskStateManager taskStateManager;
 	private final AccumulatorRegistry accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
+	private ClassLoader userClassLoader;
 
 	public DummyEnvironment() {
 		this("Test Job", 1, 0, 1);
 	}
 
+	public DummyEnvironment(ClassLoader userClassLoader) {
+		this("Test Job", 1, 0, 1);
+		this.userClassLoader = userClassLoader;
+	}
+
 	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
 		this(taskName, numSubTasks, subTaskIndex, numSubTasks);
 	}
@@ -143,7 +149,11 @@ public class DummyEnvironment implements Environment {
 
 	@Override
 	public ClassLoader getUserClassLoader() {
-		return getClass().getClassLoader();
+		if (userClassLoader == null) {
+			return getClass().getClassLoader();
+		} else {
+			return userClassLoader;
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 23493b5..a13bc1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -23,39 +23,31 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBackend> {
 
 	@Override
@@ -131,16 +123,15 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 			operatorStateBackend.close();
 			operatorStateBackend.dispose();
 
+			env = new DummyEnvironment(
+				new ArtificialCNFExceptionThrowingClassLoader(
+					getClass().getClassLoader(),
+					Collections.singleton(JavaSerializer.class.getName())));
+
 			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
 				env,
 				"testOperator");
 
-			// mock failure when deserializing serializer
-			TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-					mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-			doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-			PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
-
 			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
 			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
@@ -184,14 +175,14 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 
 		// ========== restore snapshot ==========
 
-		// mock failure when deserializing serializer
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
-
 		try {
-			restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, new DummyEnvironment());
+			restoreKeyedBackend(
+				IntSerializer.INSTANCE,
+				snapshot,
+				new DummyEnvironment(
+					new ArtificialCNFExceptionThrowingClassLoader(
+						getClass().getClassLoader(),
+						Collections.singleton(StringSerializer.class.getName()))));
 
 			fail("The keyed state restore should have failed if the previous state serializer could not be loaded.");
 		} catch (IOException expected) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 16e5188..d8918e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -26,34 +26,31 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
+import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -72,13 +69,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({TypeSerializerSerializationUtil.class, IntSerializer.class})
 public class OperatorStateBackendTest {
 
 	private final ClassLoader classLoader = getClass().getClassLoader();
@@ -889,15 +882,12 @@ public class OperatorStateBackendTest {
 			operatorStateBackend.dispose();
 
 			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
-				createMockEnvironment(),
+				new DummyEnvironment(
+					new ArtificialCNFExceptionThrowingClassLoader(
+						getClass().getClassLoader(),
+						Collections.singleton(JavaSerializer.class.getName()))),
 				"testOperator");
 
-			// mock failure when deserializing serializer
-			TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-					mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-			doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-			PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
-
 			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
 			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 57e4aed..5176fed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -29,24 +29,16 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(TypeSerializerSerializationUtil.class)
 public class SerializationProxiesTest {
 
 	@Test
@@ -112,26 +104,28 @@ public class SerializationProxiesTest {
 			serialized = out.toByteArray();
 		}
 
-		serializationProxy =
-			new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
+		Set<String> cnfThrowingSerializerClasses = new HashSet<>();
+		cnfThrowingSerializerClasses.add(IntSerializer.class.getName());
+		cnfThrowingSerializerClasses.add(LongSerializer.class.getName());
+		cnfThrowingSerializerClasses.add(DoubleSerializer.class.getName());
 
-		// mock failure when deserializing serializers
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+		serializationProxy =
+			new KeyedBackendSerializationProxy<>(
+				new ArtificialCNFExceptionThrowingClassLoader(
+					Thread.currentThread().getContextClassLoader(),
+					cnfThrowingSerializerClasses));
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression());
-		Assert.assertEquals(null, serializationProxy.getKeySerializer());
+		Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 
 		for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
-			Assert.assertEquals(null, meta.getNamespaceSerializer());
-			Assert.assertEquals(null, meta.getStateSerializer());
+			Assert.assertTrue(meta.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
+			Assert.assertTrue(meta.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
 			Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
 			Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
 		}
@@ -183,21 +177,23 @@ public class SerializationProxiesTest {
 			serialized = out.toByteArray();
 		}
 
-		// mock failure when deserializing serializer
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+		Set<String> cnfThrowingSerializerClasses = new HashSet<>();
+		cnfThrowingSerializerClasses.add(LongSerializer.class.getName());
+		cnfThrowingSerializerClasses.add(DoubleSerializer.class.getName());
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters
-				.getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+				.getReaderForVersion(
+					KeyedBackendSerializationProxy.VERSION,
+					new ArtificialCNFExceptionThrowingClassLoader(
+						Thread.currentThread().getContextClassLoader(),
+						cnfThrowingSerializerClasses))
 				.readStateMetaInfo(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertEquals(name, metaInfo.getName());
-		Assert.assertEquals(null, metaInfo.getNamespaceSerializer());
-		Assert.assertEquals(null, metaInfo.getStateSerializer());
+		Assert.assertTrue(metaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
+		Assert.assertTrue(metaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), metaInfo.getNamespaceSerializerConfigSnapshot());
 		Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getStateSerializerConfigSnapshot());
 	}
@@ -325,20 +321,22 @@ public class SerializationProxiesTest {
 			serialized = out.toByteArray();
 		}
 
-		// mock failure when deserializing serializer
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+		Set<String> cnfThrowingSerializerClasses = new HashSet<>();
+		cnfThrowingSerializerClasses.add(DoubleSerializer.class.getName());
+		cnfThrowingSerializerClasses.add(StringSerializer.class.getName());
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters
-				.getOperatorStateReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+				.getOperatorStateReaderForVersion(
+					OperatorBackendSerializationProxy.VERSION,
+					new ArtificialCNFExceptionThrowingClassLoader(
+						Thread.currentThread().getContextClassLoader(),
+						cnfThrowingSerializerClasses))
 				.readOperatorStateMetaInfo(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertEquals(name, metaInfo.getName());
-		Assert.assertEquals(null, metaInfo.getPartitionStateSerializer());
+		Assert.assertTrue(metaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getPartitionStateSerializerConfigSnapshot());
 	}
 
@@ -361,22 +359,24 @@ public class SerializationProxiesTest {
 			serialized = out.toByteArray();
 		}
 
-		// mock failure when deserializing serializer
-		TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
-				mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
-		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
-		PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+		Set<String> cnfThrowingSerializerClasses = new HashSet<>();
+		cnfThrowingSerializerClasses.add(DoubleSerializer.class.getName());
+		cnfThrowingSerializerClasses.add(StringSerializer.class.getName());
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			broadcastMetaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters
-					.getBroadcastStateReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+					.getBroadcastStateReaderForVersion(
+						OperatorBackendSerializationProxy.VERSION,
+						new ArtificialCNFExceptionThrowingClassLoader(
+							Thread.currentThread().getContextClassLoader(),
+							cnfThrowingSerializerClasses))
 					.readBroadcastStateMetaInfo(new DataInputViewStreamWrapper(in));
 		}
 
 		Assert.assertEquals(broadcastName, broadcastMetaInfo.getName());
-		Assert.assertEquals(null, broadcastMetaInfo.getKeySerializer());
+		Assert.assertTrue(broadcastMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), broadcastMetaInfo.getKeySerializerConfigSnapshot());
-		Assert.assertEquals(null, broadcastMetaInfo.getValueSerializer());
+		Assert.assertTrue(broadcastMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer);
 		Assert.assertEquals(valueSerializer.snapshotConfiguration(), broadcastMetaInfo.getValueSerializerConfigSnapshot());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c541a1f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 0c843db..c38f106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -74,6 +74,7 @@ import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;


[5/8] flink git commit: [FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011

Posted by tz...@apache.org.
[FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011

This closes #5952.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8657bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8657bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8657bf9

Branch: refs/heads/master
Commit: c8657bf9dee23bc13a281423284644013de9b850
Parents: 54f104b
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu May 3 15:53:40 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:27 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011ITCase.java      | 62 ++++++++++++++++----
 1 file changed, 52 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8657bf9/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 361f269..36cb362 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -46,9 +46,12 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 
 /**
  * IT cases for the {@link FlinkKafkaProducer011}.
@@ -76,6 +79,43 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		extraProperties.put("isolation.level", "read_committed");
 	}
 
+	@Test
+	public void resourceCleanUpNone() throws Exception {
+		resourceCleanUp(Semantic.NONE);
+	}
+
+	@Test
+	public void resourceCleanUpAtLeastOnce() throws Exception {
+		resourceCleanUp(Semantic.AT_LEAST_ONCE);
+	}
+
+	/**
+	 * This tests checks whether there is some resource leak in form of growing threads number.
+	 */
+	public void resourceCleanUp(Semantic semantic) throws Exception {
+		String topic = "flink-kafka-producer-resource-cleanup-" + semantic;
+
+		final int allowedEpsilonThreadCountGrow = 50;
+
+		Optional<Integer> initialActiveThreads = Optional.empty();
+		for (int i = 0; i < allowedEpsilonThreadCountGrow * 2; i++) {
+			try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 =
+					createTestHarness(topic, 1, 1, 0, semantic)) {
+				testHarness1.setup();
+				testHarness1.open();
+			}
+
+			if (initialActiveThreads.isPresent()) {
+				assertThat("active threads count",
+					Thread.activeCount(),
+					lessThan(initialActiveThreads.get() + allowedEpsilonThreadCountGrow));
+			}
+			else {
+				initialActiveThreads = Optional.of(Thread.activeCount());
+			}
+		}
+	}
+
 	/**
 	 * This test ensures that transactions reusing transactional.ids (after returning to the pool) will not clash
 	 * with previous transactions using same transactional.ids.
@@ -176,7 +216,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 			topic,
 			integerKeyedSerializationSchema,
 			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+			Semantic.EXACTLY_ONCE);
 
 		OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
 			new StreamSink<>(kafkaProducer),
@@ -327,7 +367,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 				topic,
 				preScaleDownParallelism,
 				preScaleDownParallelism,
-				subtaskIndex);
+				subtaskIndex,
+				Semantic.EXACTLY_ONCE);
 
 			preScaleDownOperator.setup();
 			preScaleDownOperator.open();
@@ -342,7 +383,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 		// there might not be any close)
 
 		// After previous failure simulate restarting application with smaller parallelism
-		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
+		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0, Semantic.EXACTLY_ONCE);
 
 		postScaleDownOperator1.setup();
 		postScaleDownOperator1.open();
@@ -443,7 +484,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 
 		for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
 			OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
-				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
+				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex, Semantic.EXACTLY_ONCE);
 			testHarnesses.add(testHarness);
 
 			testHarness.setup();
@@ -564,21 +605,22 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase {
 	}
 
 	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
-		return createTestHarness(topic, 1, 1, 0);
+		return createTestHarness(topic, 1, 1, 0, Semantic.EXACTLY_ONCE);
 	}
 
 	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
-		String topic,
-		int maxParallelism,
-		int parallelism,
-		int subtaskIndex) throws Exception {
+			String topic,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			Semantic semantic) throws Exception {
 		Properties properties = createProperties();
 
 		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
 			topic,
 			integerKeyedSerializationSchema,
 			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+			semantic);
 
 		return new OneInputStreamOperatorTestHarness<>(
 			new StreamSink<>(kafkaProducer),


[7/8] flink git commit: [hotfix][docs][QS] Minor cleanup of QS documentation

Posted by tz...@apache.org.
[hotfix][docs][QS] Minor cleanup of QS documentation

- replace usage of deprecated ValueStateDescriptor constructor
- fix artifact-id containing 2 underscores
- remove final keyword from method reference
- add missing space to note


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee246610
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee246610
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee246610

Branch: refs/heads/master
Commit: ee2466108e80e6d8fa447930d437727a3b6e5a15
Parents: 027117e
Author: zentol <ch...@apache.org>
Authored: Mon Nov 13 16:49:32 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:28 2018 +0800

----------------------------------------------------------------------
 docs/dev/stream/state/queryable_state.md | 29 ++++++++++++---------------
 1 file changed, 13 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee246610/docs/dev/stream/state/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md
index af646df..16df5b5 100644
--- a/docs/dev/stream/state/queryable_state.md
+++ b/docs/dev/stream/state/queryable_state.md
@@ -60,7 +60,7 @@ The Queryable State feature consists of three main entities:
  returning it to the client, and 
  3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state.
  
-In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific 
+The client connects to one of the proxies and sends a request for the state associated with a specific 
 key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in 
 *Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is 
 responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will 
@@ -143,8 +143,7 @@ can be made queryable by making the appropriate state descriptor queryable via
 ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
         new ValueStateDescriptor<>(
                 "average", // the state name
-                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
-                Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
+                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
 descriptor.setQueryable("query-name"); // queryable state name
 {% endhighlight %}
 
@@ -168,8 +167,8 @@ jar which you have to explicitly include as a dependency in the `pom.xml` of you
 {% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
-  <artifactId>flink-queryable-state-client-java_{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
+  <artifactId>flink-queryable-state-client-java{{ site.scala_version_suffix }}</artifactId>
+  <version>{{ site.version }}</version>
 </dependency>
 {% endhighlight %}
 </div>
@@ -189,11 +188,11 @@ With the client ready, to query a state of type `V`, associated with a key of ty
 
 {% highlight java %}
 CompletableFuture<S> getKvState(
-    final JobID jobId,
-    final String queryableStateName,
-    final K key,
-    final TypeInformation<K> keyTypeInfo,
-    final StateDescriptor<S, V> stateDescriptor)
+    JobID jobId,
+    String queryableStateName,
+    K key,
+    TypeInformation<K> keyTypeInfo,
+    StateDescriptor<S, V> stateDescriptor)
 {% endhighlight %}
 
 The above returns a `CompletableFuture` eventually holding the state value for the queryable state instance identified 
@@ -207,7 +206,7 @@ the actual value. This can be any of the state types supported by Flink: `ValueS
 `AggregatingState`, and the currently deprecated `FoldingState`. 
 
 <div class="alert alert-info">
-  <strong>Note:</strong>These state objects do not allow modifications to the contained state. You can use them to get 
+  <strong>Note:</strong> These state objects do not allow modifications to the contained state. You can use them to get 
   the actual value of the state, <i>e.g.</i> using <code>valueState.get()</code>, or iterate over
   the contained <code><K, V></code> entries, <i>e.g.</i> using the <code>mapState.entries()</code>, but you cannot 
   modify them. As an example, calling the <code>add()</code> method on a returned list state will throw an 
@@ -224,7 +223,7 @@ the actual value. This can be any of the state types supported by Flink: `ValueS
 
 The following example extends the `CountWindowAverage` example
 (see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state))
-by making it queryable and showing how to query this value:
+by making it queryable and shows how to query this value:
 
 {% highlight java %}
 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@@ -249,8 +248,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                 new ValueStateDescriptor<>(
                         "average", // the state name
-                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
-                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
         descriptor.setQueryable("query-name");
         sum = getRuntimeContext().getState(descriptor);
     }
@@ -266,8 +264,7 @@ QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
 ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
         new ValueStateDescriptor<>(
           "average",
-          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
-          Tuple2.of(0L, 0L));
+          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
 
 CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
         client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);


[4/8] flink git commit: [FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011

Posted by tz...@apache.org.
[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011

Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics
when closing FlinkKafkaProducer011. This was leading to resources leaking (for example
increasing number of active threads)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54f104bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54f104bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54f104bd

Branch: refs/heads/master
Commit: 54f104bd425ca709babd932dd494d33957df0b1d
Parents: a7ed135
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu May 3 15:50:53 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:27 2018 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java          | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54f104bd/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index e92f38b..0ae5e03b 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -651,6 +651,17 @@ public class FlinkKafkaProducer011<IN>
 		if (currentTransaction != null) {
 			// to avoid exceptions on aborting transactions with some pending records
 			flush(currentTransaction);
+
+			// normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus
+			// we need to close it manually
+			switch (semantic) {
+				case EXACTLY_ONCE:
+					break;
+				case AT_LEAST_ONCE:
+				case NONE:
+					currentTransaction.producer.close();
+					break;
+			}
 		}
 		try {
 			super.close();


[2/8] flink git commit: [FLINK-9169] [runtime] Allow specifiying serializer presence requirement in KeyedBackendSerializationProxy

Posted by tz...@apache.org.
[FLINK-9169] [runtime] Allow specifiying serializer presence requirement in KeyedBackendSerializationProxy

This commit consolidates logic of whether old serializers are required
to be present at restore time in the KeyedBackendSerializationProxy, via
a isSerializerPresenceRequired flag.

Heap-based backends typically set this to true, while RocksDB state
backend will set this to false. If set to true, restored serializers
cannot be the UnloadableDummyTypeSerializer, otherwise an IOException
will be thrown to fail the restore.

This closes #5950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7ed135f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7ed135f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7ed135f

Branch: refs/heads/master
Commit: a7ed135f5bd7c139ff756ee15583e918978116dd
Parents: 7c541a1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 3 15:30:55 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:22 2018 +0800

----------------------------------------------------------------------
 .../state/KeyedBackendSerializationProxy.java   | 35 +++++++--
 .../state/heap/HeapKeyedStateBackend.java       | 21 +-----
 .../runtime/state/FileStateBackendTest.java     |  5 ++
 .../runtime/state/MemoryStateBackendTest.java   |  5 ++
 .../runtime/state/SerializationProxiesTest.java |  7 +-
 .../runtime/state/StateBackendTestBase.java     | 77 ++++++++++++++++++++
 .../state/RocksDBKeyedStateBackend.java         | 12 ++-
 .../state/RocksDBStateBackendTest.java          |  5 ++
 8 files changed, 139 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 30b7344..f32e646 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -44,6 +45,9 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 	/** This specifies if we use a compressed format write the key-groups */
 	private boolean usingKeyGroupCompression;
 
+	/** This specifies whether or not to use dummy {@link UnloadableDummyTypeSerializer} when serializers cannot be read. */
+	private boolean isSerializerPresenceRequired;
+
 	private TypeSerializer<K> keySerializer;
 	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
 
@@ -51,8 +55,9 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 
 	private ClassLoader userCodeClassLoader;
 
-	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+	public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader, boolean isSerializerPresenceRequired) {
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+		this.isSerializerPresenceRequired = isSerializerPresenceRequired;
 	}
 
 	public KeyedBackendSerializationProxy(
@@ -139,17 +144,35 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab
 			this.keySerializer = (TypeSerializer<K>) keySerializerAndConfig.f0;
 			this.keySerializerConfigSnapshot = keySerializerAndConfig.f1;
 		} else {
-			this.keySerializer = TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader);
+			this.keySerializer = TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true);
 			this.keySerializerConfigSnapshot = null;
 		}
 
+		if (isSerializerPresenceRequired) {
+			checkSerializerPresence(keySerializer);
+		}
+
 		int numKvStates = in.readShort();
 		stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
 		for (int i = 0; i < numKvStates; i++) {
-			stateMetaInfoSnapshots.add(
-				KeyedBackendStateMetaInfoSnapshotReaderWriters
-					.getReaderForVersion(getReadVersion(), userCodeClassLoader)
-					.readStateMetaInfo(in));
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> snapshot = KeyedBackendStateMetaInfoSnapshotReaderWriters
+				.getReaderForVersion(getReadVersion(), userCodeClassLoader)
+				.readStateMetaInfo(in);
+
+			if (isSerializerPresenceRequired) {
+				checkSerializerPresence(snapshot.getNamespaceSerializer());
+				checkSerializerPresence(snapshot.getStateSerializer());
+			}
+			stateMetaInfoSnapshots.add(snapshot);
+		}
+	}
+
+	private void checkSerializerPresence(TypeSerializer<?> serializer) throws IOException {
+		if (serializer instanceof UnloadableDummyTypeSerializer) {
+			throw new IOException("Unable to restore keyed state, because a previous serializer" +
+				" of the keyed state is not present The serializer could have been removed from the classpath, " +
+				" or its implementation have changed and could not be loaded. This is a temporary restriction that will" +
+				" be fixed in future versions.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index d0fb8a7..a02edb0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -74,7 +74,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -344,8 +343,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			try {
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
 
+				// isSerializerPresenceRequired flag is set to true, since for the heap state backend,
+				// deserialization of state happens eagerly at restore time
 				KeyedBackendSerializationProxy<K> serializationProxy =
-						new KeyedBackendSerializationProxy<>(userCodeClassLoader);
+						new KeyedBackendSerializationProxy<>(userCodeClassLoader, true);
 
 				serializationProxy.read(inView);
 
@@ -371,22 +372,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						serializationProxy.getStateMetaInfoSnapshots();
 
 				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
-
-					if (restoredMetaInfo.getStateSerializer() == null ||
-							restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer) {
-
-						// must fail now if the previous serializer cannot be restored because there is no serializer
-						// capable of reading previous state
-						// TODO when eager state registration is in place, we can try to get a convert deserializer
-						// TODO from the newly registered serializer instead of simply failing here
-
-						throw new IOException("Unable to restore keyed state [" + restoredMetaInfo.getName() + "]." +
-							" For memory-backed keyed state, the previous serializer of the keyed state must be" +
-							" present; the serializer could have been removed from the classpath, or its implementation" +
-							" have changed and could not be loaded. This is a temporary restriction that will be fixed" +
-							" in future versions.");
-					}
-
 					restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
 					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index ca74852..beea0c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -46,6 +46,11 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 		return false;
 	}
 
+	@Override
+	protected boolean isSerializerPresenceRequiredOnRestore() {
+		return true;
+	}
+
 	// disable these because the verification does not work for this state backend
 	@Override
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index a13bc1d..0ba4c33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -59,6 +59,11 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 		return false;
 	}
 
+	@Override
+	protected boolean isSerializerPresenceRequiredOnRestore() {
+		return true;
+	}
+
 	// disable these because the verification does not work for this state backend
 	@Override
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 5176fed..3f78097 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -67,7 +67,7 @@ public class SerializationProxiesTest {
 		}
 
 		serializationProxy =
-				new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
+				new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader(), true);
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
@@ -109,11 +109,14 @@ public class SerializationProxiesTest {
 		cnfThrowingSerializerClasses.add(LongSerializer.class.getName());
 		cnfThrowingSerializerClasses.add(DoubleSerializer.class.getName());
 
+		// we want to verify restore resilience when serializer presence is not required;
+		// set isSerializerPresenceRequired to false
 		serializationProxy =
 			new KeyedBackendSerializationProxy<>(
 				new ArtificialCNFExceptionThrowingClassLoader(
 					Thread.currentThread().getContextClassLoader(),
-					cnfThrowingSerializerClasses));
+					cnfThrowingSerializerClasses),
+				false);
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index c38f106..784b628 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -145,6 +145,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 	protected abstract B getStateBackend() throws Exception;
 
+	protected abstract boolean isSerializerPresenceRequiredOnRestore();
+
 	protected CheckpointStreamFactory createStreamFactory() throws Exception {
 		if (checkpointStorageLocation == null) {
 			checkpointStorageLocation = getStateBackend()
@@ -1030,6 +1032,78 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testSerializerPresenceOnRestore() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		Environment env = new DummyEnvironment();
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		try {
+			ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializerPreUpgrade());
+			ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+			// ============== create snapshot, using the old serializer ==============
+
+			// make some modifications
+			backend.setCurrentKey(1);
+			state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
+
+			backend.setCurrentKey(2);
+			state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
+
+			KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forCheckpointWithDefaultLocation()));
+
+			snapshot1.registerSharedStates(sharedStateRegistry);
+			backend.dispose();
+
+			// ========== restore snapshot, using the new serializer (that has different classname) ==========
+
+			// on restore, simulate that the previous serializer class is no longer in the classloader
+			env = new DummyEnvironment(
+				new ArtificialCNFExceptionThrowingClassLoader(
+					getClass().getClassLoader(),
+					Collections.singleton(TestReconfigurableCustomTypeSerializerPreUpgrade.class.getName())));
+
+			try {
+				backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
+			} catch (IOException e) {
+				if (!isSerializerPresenceRequiredOnRestore()) {
+					fail("Presence of old serializer should not have been required.");
+				} else {
+					// test success
+					return;
+				}
+			}
+
+			// if serializer presence is not required, continue on to modify some state to make sure that everything works correctly
+			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializerUpgraded());
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+			backend.setCurrentKey(1);
+			state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
+
+			backend.setCurrentKey(2);
+			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
+
+			KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+				682375462379L,
+				3,
+				streamFactory,
+				CheckpointOptions.forCheckpointWithDefaultLocation()));
+
+			snapshot2.registerSharedStates(sharedStateRegistry);
+			snapshot1.discardState();
+		} finally {
+			backend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -4261,6 +4335,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 	}
 
+	public static class TestReconfigurableCustomTypeSerializerPreUpgrade extends TestReconfigurableCustomTypeSerializer {}
+	public static class TestReconfigurableCustomTypeSerializerUpgraded extends TestReconfigurableCustomTypeSerializer {}
+
 	/**
 	 * We throw this in our {@link ExceptionThrowingTestSerializer}.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 469247c..90d0fc6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -588,8 +588,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
 
+			// isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
+			// deserialization of state happens lazily during runtime; we depend on the fact
+			// that the new serializer for states could be compatible, and therefore the restore can continue
+			// without old serializers required to be present.
 			KeyedBackendSerializationProxy<K> serializationProxy =
-				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false);
 
 			serializationProxy.read(currentStateHandleInView);
 
@@ -925,8 +929,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				inputStream = metaStateHandle.openInputStream();
 				stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
+				// isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
+				// deserialization of state happens lazily during runtime; we depend on the fact
+				// that the new serializer for states could be compatible, and therefore the restore can continue
+				// without old serializers required to be present.
 				KeyedBackendSerializationProxy<T> serializationProxy =
-					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
+					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader, false);
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed135f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index f54d0e3..ad89583 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -127,6 +127,11 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		return backend;
 	}
 
+	@Override
+	protected boolean isSerializerPresenceRequiredOnRestore() {
+		return false;
+	}
+
 	// small safety net for instance cleanups, so that no native objects are left
 	@After
 	public void cleanupRocksDB() {


[6/8] flink git commit: [hotfix] Correct the javadoc of StandaloneCheckpointIDCounter#getLast

Posted by tz...@apache.org.
[hotfix] Correct the javadoc of StandaloneCheckpointIDCounter#getLast

This closes #5956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/027117ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/027117ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/027117ec

Branch: refs/heads/master
Commit: 027117ec0d2d020ac20fbb66142eb86b62b9835b
Parents: bcb6b84
Author: Qiu Congxian/klion26 <qc...@gmail.com>
Authored: Sun May 6 11:19:07 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:28 2018 +0800

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/027117ec/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index e4ed996..f43df5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -49,7 +49,7 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
 	}
 
 	/**
-	 * Returns the last checkpoint ID (current - 10.
+	 * Returns the last checkpoint ID (current - 1).
 	 *
 	 * @return Last checkpoint ID.
 	 */


[3/8] flink git commit: [hotfix ] Fix incorrect parallelism in the Javadoc of ListCheckpointed

Posted by tz...@apache.org.
[hotfix ] Fix incorrect parallelism in the Javadoc of ListCheckpointed

This closes #5918.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcb6b848
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcb6b848
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcb6b848

Branch: refs/heads/master
Commit: bcb6b848484cc61aa6ac0a61662f7d1fad1fd800
Parents: c8657bf
Author: maqingxiang-it <ma...@dev05v.sys.corp.qihoo.net>
Authored: Thu Apr 26 18:07:54 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:27 2018 +0800

----------------------------------------------------------------------
 .../apache/flink/streaming/api/checkpoint/ListCheckpointed.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcb6b848/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 0b8b1b6..b789a51 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -66,7 +66,7 @@ import java.util.List;
  * +----+   +----+   +----+   +----+   +----+
  * </pre>
 
- * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment:
+ * Recovering the checkpoint with <i>parallelism = 2</i> yields the following state assignment:
  * <pre>
  *      func_1          func_2
  * +----+----+----+   +----+----+


[8/8] flink git commit: [FLINK-8064][docs][QS] List flink-core as dependency

Posted by tz...@apache.org.
[FLINK-8064][docs][QS] List flink-core as dependency

This closes #5006.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbc8ac6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbc8ac6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbc8ac6

Branch: refs/heads/master
Commit: 7dbc8ac6ae6fe35de15038668124818a90f6fdd2
Parents: ee24661
Author: zentol <ch...@apache.org>
Authored: Mon Nov 13 16:52:00 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Mon May 7 10:56:28 2018 +0800

----------------------------------------------------------------------
 docs/dev/stream/state/queryable_state.md | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7dbc8ac6/docs/dev/stream/state/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md
index 16df5b5..4f3e802 100644
--- a/docs/dev/stream/state/queryable_state.md
+++ b/docs/dev/stream/state/queryable_state.md
@@ -161,12 +161,17 @@ So far, you have set up your cluster to run with queryable state and you have de
 queryable. Now it is time to see how to query this state. 
 
 For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` 
-jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below:
+jar which must be explicitly included as a dependency in the `pom.xml` of your project along with `flink-core`, as shown below:
 
 <div data-lang="java" markdown="1">
 {% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
+  <artifactId>flink-core</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
   <artifactId>flink-queryable-state-client-java{{ site.scala_version_suffix }}</artifactId>
   <version>{{ site.version }}</version>
 </dependency>