You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/09/21 13:31:25 UTC

[flink] branch master updated: [FLINK-10157][State TTL] Allow `null` user values in map state with TTL

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f343204  [FLINK-10157][State TTL] Allow `null` user values in map state with TTL
f343204 is described below

commit f3432042fef908829dbdc1dcdc2b869fa6fc4005
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 12 18:12:14 2018 +0200

    [FLINK-10157][State TTL] Allow `null` user values in map state with TTL
    
    This closes #6707.
---
 docs/dev/stream/state/state.md                     |   3 +
 .../flink/api/common/state/MapStateDescriptor.java |   6 +
 .../flink/api/common/state/StateTtlConfig.java     |   6 +
 .../java/typeutils/runtime/NullableSerializer.java | 285 +++++++++++++++++++++
 .../api/common/typeutils/SerializerTestBase.java   | 167 ++++++------
 .../typeutils/runtime/NullableSerializerTest.java  |  87 +++++++
 .../runtime/state/ttl/AbstractTtlDecorator.java    |  10 +-
 .../flink/runtime/state/ttl/TtlMapState.java       |  17 +-
 .../apache/flink/runtime/state/ttl/TtlUtils.java   |   2 +-
 .../apache/flink/runtime/state/ttl/TtlValue.java   |   7 +-
 .../ttl/TtlMapStateAllEntriesTestContext.java      |  11 +-
 .../ttl/TtlMapStatePerElementTestContext.java      |   5 +-
 .../ttl/TtlMapStatePerNullElementTestContext.java} |  37 +--
 .../flink/runtime/state/ttl/TtlStateTestBase.java  |   1 +
 14 files changed, 530 insertions(+), 114 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index fb78776..decf1db 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -350,6 +350,9 @@ will lead to compatibility failure and `StateMigrationException`.
 
 - The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.
 
+- The map state with TTL currently supports null user values only if the user value serializer can handle null values. 
+If the serializer does not support null values, it can be wrapped with `NullableSerializer` at the cost of an extra byte in the serialized form.
+
 #### Cleanup of Expired State
 
 Currently, expired values are only removed when they are read out explicitly, 
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 6eb8ddc..9b0094d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -37,6 +37,12 @@ import java.util.Map;
  * <p>To create keyed map state (on a KeyedStream), use
  * {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
  *
+ * <p>Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
+ *
  * @param <UK> The type of the keys that can be added to the map state.
  */
 @PublicEvolving
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index f4ed929..42eaea4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -32,6 +32,12 @@ import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCrea
 
 /**
  * Configuration of state TTL logic.
+ *
+ * <p>Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
  */
 public class StateTtlConfig implements Serializable {
 
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
new file mode 100644
index 0000000..fe392e4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serializer wrapper to add support of {@code null} value serialization.
+ *
+ * <p>If the target serializer does not support {@code null} values of its type,
+ * you can use this class to wrap this serializer.
+ * This is a generic treatment of {@code null} value serialization
+ * which comes with the cost of additional byte in the final serialized value.
+ * The {@code NullableSerializer} will intercept {@code null} value serialization case
+ * and prepend the target serialized value with a boolean flag marking whether it is {@code null} or not.
+ * <pre> {@code
+ * TypeSerializer<T> originalSerializer = ...;
+ * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrap(originalSerializer);
+ * // or
+ * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
+ * }}</pre>
+ *
+ * @param <T> type to serialize
+ */
+public class NullableSerializer<T> extends TypeSerializer<T> {
+	private static final long serialVersionUID = 3335569358214720033L;
+	private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+	@Nonnull
+	private final TypeSerializer<T> originalSerializer;
+	private final byte[] padding;
+
+	private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
+		this.originalSerializer = originalSerializer;
+		this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen);
+
+	}
+
+	private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
+		boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen;
+		return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY;
+	}
+
+	/**
+	 * This method tries to serialize {@code null} value with the {@code originalSerializer}
+	 * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}.
+	 *
+	 * @param originalSerializer serializer to wrap and add {@code null} support
+	 * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
+	 * @return serializer which supports {@code null} values
+	 */
+	public static <T> TypeSerializer<T> wrapIfNullIsNotSupported(
+		@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
+		return checkIfNullSupported(originalSerializer) ?
+			originalSerializer : wrap(originalSerializer, padNullValueIfFixedLen);
+	}
+
+	/**
+	 * This method checks if {@code serializer} supports {@code null} value.
+	 *
+	 * @param serializer serializer to check
+	 */
+	public static <T> boolean checkIfNullSupported(@Nonnull TypeSerializer<T> serializer) {
+		int length = serializer.getLength() > 0 ? serializer.getLength() : 1;
+		DataOutputSerializer dos = new DataOutputSerializer(length);
+		try {
+			serializer.serialize(null, dos);
+		} catch (IOException | RuntimeException e) {
+			return false;
+		}
+		Preconditions.checkArgument(
+			serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length,
+			"The serialized form of the null value should have the same length " +
+				"as any other if the length is fixed in the serializer");
+		DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
+		try {
+			Preconditions.checkArgument(serializer.deserialize(dis) == null);
+		} catch (IOException e) {
+			throw new RuntimeException(
+				String.format("Unexpected failure to deserialize just serialized null value with %s",
+					serializer.getClass().getName()), e);
+		}
+		Preconditions.checkArgument(
+			serializer.copy(null) == null,
+			"Serializer %s has to be able properly copy null value if it can serialize it",
+			serializer.getClass().getName());
+		return true;
+	}
+
+	private boolean padNullValue() {
+		return padding.length > 0;
+	}
+
+	/**
+	 * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped.
+	 *
+	 * @param originalSerializer serializer to wrap and add {@code null} support
+	 * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
+	 * @return wrapped serializer which supports {@code null} values
+	 */
+	public static <T> TypeSerializer<T> wrap(
+		@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
+		return originalSerializer instanceof NullableSerializer ?
+			originalSerializer : new NullableSerializer<>(originalSerializer, padNullValueIfFixedLen);
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return originalSerializer.isImmutableType();
+	}
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		TypeSerializer<T> duplicateOriginalSerializer = originalSerializer.duplicate();
+		return duplicateOriginalSerializer == originalSerializer ?
+			this : new NullableSerializer<>(originalSerializer.duplicate(), padNullValue());
+	}
+
+	@Override
+	public T createInstance() {
+		return originalSerializer.createInstance();
+	}
+
+	@Override
+	public T copy(T from) {
+		return from == null ? null : originalSerializer.copy(from);
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return from == null ? null :
+			(reuse == null ? originalSerializer.copy(from) : originalSerializer.copy(from, reuse));
+	}
+
+	@Override
+	public int getLength() {
+		return padNullValue() ? 1 + padding.length : -1;
+	}
+
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		if (record == null) {
+			target.writeBoolean(true);
+			target.write(padding);
+		} else {
+			target.writeBoolean(false);
+			originalSerializer.serialize(record, target);
+		}
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		boolean isNull = deserializeNull(source);
+		return isNull ? null : originalSerializer.deserialize(source);
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		boolean isNull = deserializeNull(source);
+		return isNull ? null : (reuse == null ?
+			originalSerializer.deserialize(source) : originalSerializer.deserialize(reuse, source));
+	}
+
+	private boolean deserializeNull(DataInputView source) throws IOException {
+		boolean isNull = source.readBoolean();
+		if (isNull) {
+			source.skipBytesToRead(padding.length);
+		}
+		return isNull;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		boolean isNull = source.readBoolean();
+		target.writeBoolean(isNull);
+		if (isNull) {
+			target.write(padding);
+		} else {
+			originalSerializer.copy(source, target);
+		}
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this ||
+			(obj != null && obj.getClass() == getClass() &&
+				originalSerializer.equals(((NullableSerializer) obj).originalSerializer));
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return (obj != null && obj.getClass() == getClass() &&
+			originalSerializer.canEqual(((NullableSerializer) obj).originalSerializer));
+	}
+
+	@Override
+	public int hashCode() {
+		return originalSerializer.hashCode();
+	}
+
+	@Override
+	public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new NullableSerializerConfigSnapshot<>(originalSerializer);
+	}
+
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof NullableSerializerConfigSnapshot) {
+			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
+				((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
+				previousKvSerializersAndConfigs.get(0).f0,
+				UnloadableDummyTypeSerializer.class,
+				previousKvSerializersAndConfigs.get(0).f1,
+				originalSerializer);
+
+			if (!compatResult.isRequiresMigration()) {
+				return CompatibilityResult.compatible();
+			} else if (compatResult.getConvertDeserializer() != null) {
+				return CompatibilityResult.requiresMigration(
+					new NullableSerializer<>(
+						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue()));
+			}
+		}
+
+		return CompatibilityResult.requiresMigration();
+	}
+
+	/**
+	 * Configuration snapshot for serializers of nullable types, containing the
+	 * configuration snapshot of its original serializer.
+	 */
+	@Internal
+	public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+		private static final int VERSION = 1;
+
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		@SuppressWarnings("unused")
+		public NullableSerializerConfigSnapshot() {}
+
+		NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) {
+			super(originalSerializer);
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 57015c7..1997866 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -32,6 +32,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
@@ -53,23 +54,23 @@ import org.junit.Test;
  * internal state would be corrupt, which becomes evident when toString is called.
  */
 public abstract class SerializerTestBase<T> extends TestLogger {
-	
+
 	protected abstract TypeSerializer<T> createSerializer();
 
 	/**
 	 * Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method.
-	 * 
+	 *
 	 * <p>The expected length should be positive, for fix-length data types, or {@code -1} for
 	 * variable-length types.
 	 */
 	protected abstract int getLength();
-	
+
 	protected abstract Class<T> getTypeClass();
-	
+
 	protected abstract T[] getTestData();
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Test
 	public void testInstantiate() {
 		try {
@@ -80,13 +81,13 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			}
 			T instance = serializer.createInstance();
 			assertNotNull("The created instance must not be null.", instance);
-			
+
 			Class<T> type = getTypeClass();
 			assertNotNull("The test is corrupt: type class is null.", type);
 
 			if (!type.isAssignableFrom(instance.getClass())) {
 				fail("Type of the instantiated object is wrong. " +
-						"Expected Type: " + type + " present type " + instance.getClass());
+					"Expected Type: " + type + " present type " + instance.getClass());
 			}
 		}
 		catch (Exception e) {
@@ -127,7 +128,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 		strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
 		assertTrue(strategy.isRequiresMigration());
 	}
-	
+
 	@Test
 	public void testGetLength() {
 		final int len = getLength();
@@ -146,16 +147,16 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCopy() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			for (T datum : testData) {
 				T copy = serializer.copy(datum);
-				copy.toString();
+				checkToString(copy);
 				deepEquals("Copied element is not equal to the original element.", datum, copy);
 			}
 		}
@@ -165,16 +166,16 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCopyIntoNewElements() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			for (T datum : testData) {
 				T copy = serializer.copy(datum, serializer.createInstance());
-				copy.toString();
+				checkToString(copy);
 				deepEquals("Copied element is not equal to the original element.", datum, copy);
 			}
 		}
@@ -184,18 +185,18 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCopyIntoReusedElements() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			T target = serializer.createInstance();
-			
+
 			for (T datum : testData) {
 				T copy = serializer.copy(datum, target);
-				copy.toString();
+				checkToString(copy);
 				deepEquals("Copied element is not equal to the original element.", datum, copy);
 				target = copy;
 			}
@@ -206,25 +207,25 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializeIndividually() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			for (T value : testData) {
 				TestOutputView out = new TestOutputView();
 				serializer.serialize(value, out);
 				TestInputView in = out.getInputView();
-				
+
 				assertTrue("No data available during deserialization.", in.available() > 0);
-				
+
 				T deserialized = serializer.deserialize(serializer.createInstance(), in);
- 				deserialized.toString();
+				checkToString(deserialized);
 
 				deepEquals("Deserialized value if wrong.", value, deserialized);
-				
+
 				assertTrue("Trailing data available after deserialization.", in.available() == 0);
 			}
 		}
@@ -241,23 +242,23 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			T reuseValue = serializer.createInstance();
-			
+
 			for (T value : testData) {
 				TestOutputView out = new TestOutputView();
 				serializer.serialize(value, out);
 				TestInputView in = out.getInputView();
-				
+
 				assertTrue("No data available during deserialization.", in.available() > 0);
-				
+
 				T deserialized = serializer.deserialize(reuseValue, in);
-				deserialized.toString();
+				checkToString(deserialized);
 
 				deepEquals("Deserialized value if wrong.", value, deserialized);
-				
+
 				assertTrue("Trailing data available after deserialization.", in.available() == 0);
-				
+
 				reuseValue = deserialized;
 			}
 		}
@@ -267,29 +268,29 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializeAsSequenceNoReuse() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			TestOutputView out = new TestOutputView();
 			for (T value : testData) {
 				serializer.serialize(value, out);
 			}
-			
+
 			TestInputView in = out.getInputView();
-			
+
 			int num = 0;
 			while (in.available() > 0) {
 				T deserialized = serializer.deserialize(in);
-				deserialized.toString();
+				checkToString(deserialized);
 
 				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
 				num++;
 			}
-			
+
 			assertEquals("Wrong number of elements deserialized.", testData.length, num);
 		}
 		catch (Exception e) {
@@ -298,31 +299,31 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializeAsSequenceReusingValues() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			TestOutputView out = new TestOutputView();
 			for (T value : testData) {
 				serializer.serialize(value, out);
 			}
-			
+
 			TestInputView in = out.getInputView();
 			T reuseValue = serializer.createInstance();
-			
+
 			int num = 0;
 			while (in.available() > 0) {
 				T deserialized = serializer.deserialize(reuseValue, in);
-				deserialized.toString();
+				checkToString(deserialized);
 
 				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
 				reuseValue = deserialized;
 				num++;
 			}
-			
+
 			assertEquals("Wrong number of elements deserialized.", testData.length, num);
 		}
 		catch (Exception e) {
@@ -331,30 +332,30 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializedCopyIndividually() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			for (T value : testData) {
 				TestOutputView out = new TestOutputView();
 				serializer.serialize(value, out);
-				
+
 				TestInputView source = out.getInputView();
 				TestOutputView target = new TestOutputView();
 				serializer.copy(source, target);
-				
+
 				TestInputView toVerify = target.getInputView();
-				
+
 				assertTrue("No data available copying.", toVerify.available() > 0);
-				
+
 				T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
-				deserialized.toString();
+				checkToString(deserialized);
 
 				deepEquals("Deserialized value if wrong.", value, deserialized);
-				
+
 				assertTrue("Trailing data available after deserialization.", toVerify.available() == 0);
 			}
 		}
@@ -364,36 +365,36 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
-	
+
+
 	@Test
 	public void testSerializedCopyAsSequence() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();
-			
+
 			TestOutputView out = new TestOutputView();
 			for (T value : testData) {
 				serializer.serialize(value, out);
 			}
-			
+
 			TestInputView source = out.getInputView();
 			TestOutputView target = new TestOutputView();
 			for (int i = 0; i < testData.length; i++) {
 				serializer.copy(source, target);
 			}
-			
+
 			TestInputView toVerify = target.getInputView();
 			int num = 0;
-			
+
 			while (toVerify.available() > 0) {
 				T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
-				deserialized.toString();
+				checkToString(deserialized);
 
 				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
 				num++;
 			}
-			
+
 			assertEquals("Wrong number of elements copied.", testData.length, num);
 		}
 		catch (Exception e) {
@@ -402,7 +403,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializabilityAndEquals() {
 		try {
@@ -414,7 +415,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 				fail("The serializer is not serializable: " + e);
 				return;
 			}
-			
+
 			assertEquals("The copy of the serializer is not equal to the original one.", ser1, ser2);
 		}
 		catch (Exception e) {
@@ -423,10 +424,26 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			fail("Exception in test: " + e.getMessage());
 		}
 	}
-	
+
+	@Test
+	public void testNullability() {
+		TypeSerializer<T> serializer = getSerializer();
+		try {
+			NullableSerializer.checkIfNullSupported(serializer);
+		} catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			fail("Unexpected failure of null value handling: " + t.getMessage());
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected void deepEquals(String message, T should, T is) {
+		Assert.assertTrue((should == null && is == null) || (should != null && is != null));
+		if (should == null) {
+			return;
+		}
 		if (should.getClass().isArray()) {
 			if (should instanceof boolean[]) {
 				Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is));
@@ -463,9 +480,9 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			assertEquals(message,  should, is);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected TypeSerializer<T> getSerializer() {
 		TypeSerializer<T> serializer = createSerializer();
 		if (serializer == null) {
@@ -473,7 +490,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 		}
 		return serializer;
 	}
-	
+
 	private T[] getData() {
 		T[] data = getTestData();
 		if (data == null) {
@@ -481,15 +498,15 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 		}
 		return data;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static final class TestOutputView extends DataOutputStream implements DataOutputView {
-		
+
 		public TestOutputView() {
 			super(new ByteArrayOutputStream(4096));
 		}
-		
+
 		public TestInputView getInputView() {
 			ByteArrayOutputStream baos = (ByteArrayOutputStream) out;
 			return new TestInputView(baos.toByteArray());
@@ -509,8 +526,8 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			write(buffer);
 		}
 	}
-	
-	
+
+
 	private static final class TestInputView extends DataInputStream implements DataInputView {
 
 		public TestInputView(byte[] data) {
@@ -542,4 +559,10 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			return getClass().hashCode();
 		}
 	}
+
+	private static <T> void checkToString(T value) {
+		if (value != null) {
+			value.toString();
+		}
+	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
new file mode 100644
index 0000000..3bd176a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Unit tests for {@link NullableSerializer}. */
+@RunWith(Parameterized.class)
+public class NullableSerializerTest extends SerializerTestBase<Integer> {
+	private static final TypeSerializer<Integer> originalSerializer = IntSerializer.INSTANCE;
+
+	@Parameterized.Parameters(name = "{0}")
+	public static List<Boolean> whetherToPadNullValue() {
+		return Arrays.asList(true, false);
+	}
+
+	@Parameterized.Parameter
+	public boolean padNullValue;
+
+	private TypeSerializer<Integer> nullableSerializer;
+
+	@Before
+	public void init() {
+		nullableSerializer = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue);
+	}
+
+	@Override
+	protected TypeSerializer<Integer> createSerializer() {
+		return NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue);
+	}
+
+	@Override
+	protected int getLength() {
+		return padNullValue ? 5 : -1;
+	}
+
+	@Override
+	protected Class<Integer> getTypeClass() {
+		return Integer.class;
+	}
+
+	@Override
+	protected Integer[] getTestData() {
+		return new Integer[] { 5, -1, 0, null };
+	}
+
+	@Test
+	public void testWrappingNotNeeded() {
+		assertEquals(NullableSerializer.wrapIfNullIsNotSupported(StringSerializer.INSTANCE, padNullValue), StringSerializer.INSTANCE);
+	}
+
+	@Test
+	public void testWrappingNeeded() {
+		assertTrue(nullableSerializer instanceof NullableSerializer);
+		assertEquals(NullableSerializer.wrapIfNullIsNotSupported(nullableSerializer, padNullValue), nullableSerializer);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 268f84a..3b0a99f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -81,6 +81,14 @@ abstract class AbstractTtlDecorator<T> {
 		SupplierWithException<TtlValue<V>, SE> getter,
 		ThrowingConsumer<TtlValue<V>, CE> updater,
 		ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
+		TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
+		return ttlValue == null ? null : ttlValue.getUserValue();
+	}
+
+	<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
+		SupplierWithException<TtlValue<V>, SE> getter,
+		ThrowingConsumer<TtlValue<V>, CE> updater,
+		ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
 		TtlValue<V> ttlValue = getter.get();
 		if (ttlValue == null) {
 			return null;
@@ -92,6 +100,6 @@ abstract class AbstractTtlDecorator<T> {
 		} else if (updateTsOnRead) {
 			updater.accept(rewrapWithNewTs(ttlValue));
 		}
-		return ttlValue.getUserValue();
+		return ttlValue;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 160dbeb..f84a2ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -54,7 +54,13 @@ class TtlMapState<K, N, UK, UV>
 
 	@Override
 	public UV get(UK key) throws Exception {
-		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
+		TtlValue<UV> ttlValue = getWrapped(key);
+		return ttlValue == null ? null : ttlValue.getUserValue();
+	}
+
+	private TtlValue<UV> getWrapped(UK key) throws Exception {
+		return getWrappedWithTtlCheckAndUpdate(
+			() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
 	}
 
 	@Override
@@ -83,7 +89,8 @@ class TtlMapState<K, N, UK, UV>
 
 	@Override
 	public boolean contains(UK key) throws Exception {
-		return get(key) != null;
+		TtlValue<UV> ttlValue = getWrapped(key);
+		return ttlValue != null;
 	}
 
 	@Override
@@ -161,16 +168,16 @@ class TtlMapState<K, N, UK, UV>
 		}
 
 		private Map.Entry<UK, UV> getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
-			UV unexpiredValue;
+			TtlValue<UV> unexpiredValue;
 			try {
-				unexpiredValue = getWithTtlCheckAndUpdate(
+				unexpiredValue = getWrappedWithTtlCheckAndUpdate(
 					e::getValue,
 					v -> original.put(e.getKey(), v),
 					originalIterator::remove);
 			} catch (Exception ex) {
 				throw new FlinkRuntimeException(ex);
 			}
-			return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue);
+			return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue.getUserValue());
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index 773fe7c..be231c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -44,6 +44,6 @@ class TtlUtils {
 	}
 
 	static <V> TtlValue<V> wrapWithTs(V value, long ts) {
-		return value == null ? null : new TtlValue<>(value, ts);
+		return new TtlValue<>(value, ts);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index a8bcadf..48435d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
@@ -30,15 +30,16 @@ import java.io.Serializable;
 class TtlValue<T> implements Serializable {
 	private static final long serialVersionUID = 5221129704201125020L;
 
+	@Nullable
 	private final T userValue;
 	private final long lastAccessTimestamp;
 
-	TtlValue(T userValue, long lastAccessTimestamp) {
-		Preconditions.checkNotNull(userValue);
+	TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
 		this.userValue = userValue;
 		this.lastAccessTimestamp = lastAccessTimestamp;
 	}
 
+	@Nullable
 	T getUserValue() {
 		return userValue;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
index 7fd61aa..7294b4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -35,9 +36,9 @@ class TtlMapStateAllEntriesTestContext extends
 	void initTestValues() {
 		emptyValue = Collections.emptySet();
 
-		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
-		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
-		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+		updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(23, null), Tuple2.of(10, "10"));
+		updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(24, null), Tuple2.of(7, "7"));
+		updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(25, null), Tuple2.of(4, "4"));
 
 		getUpdateEmpty = updateEmpty.entrySet();
 		getUnexpired = updateUnexpired.entrySet();
@@ -46,7 +47,9 @@ class TtlMapStateAllEntriesTestContext extends
 
 	@SafeVarargs
 	private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
-		return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+		Map<UK, UV> map = new HashMap<>();
+		Arrays.stream(entries).forEach(t -> map.put(t.f0, t.f1));
+		return map;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
index fb025af..a77c8ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -43,7 +43,10 @@ class TtlMapStatePerElementTestContext extends TtlMapStateTestContext<String, St
 
 	@Override
 	String get() throws Exception {
-		return ttlState.get(TEST_KEY);
+		String value = ttlState.get(TEST_KEY);
+		assert (getOriginal() == null && !ttlState.contains(TEST_KEY)) ||
+			(getOriginal() != null && ttlState.contains(TEST_KEY));
+		return value;
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
index a8bcadf..6f8c70b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
@@ -18,32 +18,15 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * This class wraps user value of state with TTL.
- *
- * @param <T> Type of the user value of state with TTL
- */
-class TtlValue<T> implements Serializable {
-	private static final long serialVersionUID = 5221129704201125020L;
-
-	private final T userValue;
-	private final long lastAccessTimestamp;
-
-	TtlValue(T userValue, long lastAccessTimestamp) {
-		Preconditions.checkNotNull(userValue);
-		this.userValue = userValue;
-		this.lastAccessTimestamp = lastAccessTimestamp;
-	}
-
-	T getUserValue() {
-		return userValue;
-	}
-
-	long getLastAccessTimestamp() {
-		return lastAccessTimestamp;
+class TtlMapStatePerNullElementTestContext extends TtlMapStatePerElementTestContext {
+	@Override
+	void initTestValues() {
+		updateEmpty = null;
+		updateUnexpired = null;
+		updateExpired = null;
+
+		getUpdateEmpty = null;
+		getUnexpired = null;
+		getUpdateExpired = null;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 6b3a15f..f9f108a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -69,6 +69,7 @@ public abstract class TtlStateTestBase {
 			new TtlListStateTestContext(),
 			new TtlMapStateAllEntriesTestContext(),
 			new TtlMapStatePerElementTestContext(),
+			new TtlMapStatePerNullElementTestContext(),
 			new TtlAggregatingStateTestContext(),
 			new TtlReducingStateTestContext(),
 			new TtlFoldingStateTestContext());