You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/09/21 13:33:49 UTC
[flink] branch release-1.6 updated: [FLINK-10157][State TTL] Allow
`null` user values in map state with TTL
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 3558d15 [FLINK-10157][State TTL] Allow `null` user values in map state with TTL
3558d15 is described below
commit 3558d1586c8e57e848efd4bf51aedf4372ea28d8
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 12 18:12:14 2018 +0200
[FLINK-10157][State TTL] Allow `null` user values in map state with TTL
This closes #6707.
---
docs/dev/stream/state/state.md | 3 +
.../flink/api/common/state/MapStateDescriptor.java | 6 +
.../flink/api/common/state/StateTtlConfig.java | 6 +
.../java/typeutils/runtime/NullableSerializer.java | 285 +++++++++++++++++++++
.../api/common/typeutils/SerializerTestBase.java | 167 ++++++------
.../typeutils/runtime/NullableSerializerTest.java | 87 +++++++
.../runtime/state/ttl/AbstractTtlDecorator.java | 10 +-
.../flink/runtime/state/ttl/TtlMapState.java | 17 +-
.../apache/flink/runtime/state/ttl/TtlUtils.java | 2 +-
.../apache/flink/runtime/state/ttl/TtlValue.java | 7 +-
.../ttl/TtlMapStateAllEntriesTestContext.java | 11 +-
.../ttl/TtlMapStatePerElementTestContext.java | 5 +-
.../ttl/TtlMapStatePerNullElementTestContext.java} | 37 +--
.../flink/runtime/state/ttl/TtlStateTestBase.java | 1 +
14 files changed, 530 insertions(+), 114 deletions(-)
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index fb78776..decf1db 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -350,6 +350,9 @@ will lead to compatibility failure and `StateMigrationException`.
- The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.
+- The map state with TTL currently supports null user values only if the user value serializer can handle null values.
+If the serializer does not support null values, it can be wrapped with `NullableSerializer` at the cost of an extra byte in the serialized form.
+
#### Cleanup of Expired State
Currently, expired values are only removed when they are read out explicitly,
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 6eb8ddc..9b0094d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -37,6 +37,12 @@ import java.util.Map;
* <p>To create keyed map state (on a KeyedStream), use
* {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
*
+ * <p>Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
+ *
* @param <UK> The type of the keys that can be added to the map state.
*/
@PublicEvolving
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index f4ed929..42eaea4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -32,6 +32,12 @@ import static org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCrea
/**
* Configuration of state TTL logic.
+ *
+ * <p>Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
*/
public class StateTtlConfig implements Serializable {
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
new file mode 100644
index 0000000..fe392e4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serializer wrapper to add support of {@code null} value serialization.
+ *
+ * <p>If the target serializer does not support {@code null} values of its type,
+ * you can use this class to wrap this serializer.
+ * This is a generic treatment of {@code null} value serialization
+ * which comes with the cost of additional byte in the final serialized value.
+ * The {@code NullableSerializer} will intercept {@code null} value serialization case
+ * and prepend the target serialized value with a boolean flag marking whether it is {@code null} or not.
+ * <pre> {@code
+ * TypeSerializer<T> originalSerializer = ...;
+ * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrap(originalSerializer);
+ * // or
+ * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
+ * }}</pre>
+ *
+ * @param <T> type to serialize
+ */
+public class NullableSerializer<T> extends TypeSerializer<T> {
+ private static final long serialVersionUID = 3335569358214720033L;
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+ @Nonnull
+ private final TypeSerializer<T> originalSerializer;
+ private final byte[] padding;
+
+ private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
+ this.originalSerializer = originalSerializer;
+ this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen);
+
+ }
+
+ private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
+ boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen;
+ return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY;
+ }
+
+ /**
+ * This method tries to serialize {@code null} value with the {@code originalSerializer}
+ * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}.
+ *
+ * @param originalSerializer serializer to wrap and add {@code null} support
+ * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
+ * @return serializer which supports {@code null} values
+ */
+ public static <T> TypeSerializer<T> wrapIfNullIsNotSupported(
+ @Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
+ return checkIfNullSupported(originalSerializer) ?
+ originalSerializer : wrap(originalSerializer, padNullValueIfFixedLen);
+ }
+
+ /**
+ * This method checks if {@code serializer} supports {@code null} value.
+ *
+ * @param serializer serializer to check
+ */
+ public static <T> boolean checkIfNullSupported(@Nonnull TypeSerializer<T> serializer) {
+ int length = serializer.getLength() > 0 ? serializer.getLength() : 1;
+ DataOutputSerializer dos = new DataOutputSerializer(length);
+ try {
+ serializer.serialize(null, dos);
+ } catch (IOException | RuntimeException e) {
+ return false;
+ }
+ Preconditions.checkArgument(
+ serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length,
+ "The serialized form of the null value should have the same length " +
+ "as any other if the length is fixed in the serializer");
+ DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
+ try {
+ Preconditions.checkArgument(serializer.deserialize(dis) == null);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unexpected failure to deserialize just serialized null value with %s",
+ serializer.getClass().getName()), e);
+ }
+ Preconditions.checkArgument(
+ serializer.copy(null) == null,
+ "Serializer %s has to be able properly copy null value if it can serialize it",
+ serializer.getClass().getName());
+ return true;
+ }
+
+ private boolean padNullValue() {
+ return padding.length > 0;
+ }
+
+ /**
+ * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped.
+ *
+ * @param originalSerializer serializer to wrap and add {@code null} support
+ * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
+ * @return wrapped serializer which supports {@code null} values
+ */
+ public static <T> TypeSerializer<T> wrap(
+ @Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
+ return originalSerializer instanceof NullableSerializer ?
+ originalSerializer : new NullableSerializer<>(originalSerializer, padNullValueIfFixedLen);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return originalSerializer.isImmutableType();
+ }
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ TypeSerializer<T> duplicateOriginalSerializer = originalSerializer.duplicate();
+ return duplicateOriginalSerializer == originalSerializer ?
+ this : new NullableSerializer<>(originalSerializer.duplicate(), padNullValue());
+ }
+
+ @Override
+ public T createInstance() {
+ return originalSerializer.createInstance();
+ }
+
+ @Override
+ public T copy(T from) {
+ return from == null ? null : originalSerializer.copy(from);
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return from == null ? null :
+ (reuse == null ? originalSerializer.copy(from) : originalSerializer.copy(from, reuse));
+ }
+
+ @Override
+ public int getLength() {
+ return padNullValue() ? 1 + padding.length : -1;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ if (record == null) {
+ target.writeBoolean(true);
+ target.write(padding);
+ } else {
+ target.writeBoolean(false);
+ originalSerializer.serialize(record, target);
+ }
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ boolean isNull = deserializeNull(source);
+ return isNull ? null : originalSerializer.deserialize(source);
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ boolean isNull = deserializeNull(source);
+ return isNull ? null : (reuse == null ?
+ originalSerializer.deserialize(source) : originalSerializer.deserialize(reuse, source));
+ }
+
+ private boolean deserializeNull(DataInputView source) throws IOException {
+ boolean isNull = source.readBoolean();
+ if (isNull) {
+ source.skipBytesToRead(padding.length);
+ }
+ return isNull;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ boolean isNull = source.readBoolean();
+ target.writeBoolean(isNull);
+ if (isNull) {
+ target.write(padding);
+ } else {
+ originalSerializer.copy(source, target);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ originalSerializer.equals(((NullableSerializer) obj).originalSerializer));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return (obj != null && obj.getClass() == getClass() &&
+ originalSerializer.canEqual(((NullableSerializer) obj).originalSerializer));
+ }
+
+ @Override
+ public int hashCode() {
+ return originalSerializer.hashCode();
+ }
+
+ @Override
+ public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
+ return new NullableSerializerConfigSnapshot<>(originalSerializer);
+ }
+
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof NullableSerializerConfigSnapshot) {
+ List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
+ ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+ CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
+ previousKvSerializersAndConfigs.get(0).f0,
+ UnloadableDummyTypeSerializer.class,
+ previousKvSerializersAndConfigs.get(0).f1,
+ originalSerializer);
+
+ if (!compatResult.isRequiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (compatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new NullableSerializer<>(
+ new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue()));
+ }
+ }
+
+ return CompatibilityResult.requiresMigration();
+ }
+
+ /**
+ * Configuration snapshot for serializers of nullable types, containing the
+ * configuration snapshot of its original serializer.
+ */
+ @Internal
+ public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+ private static final int VERSION = 1;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ @SuppressWarnings("unused")
+ public NullableSerializerConfigSnapshot() {}
+
+ NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) {
+ super(originalSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 57015c7..1997866 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -32,6 +32,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.InstantiationUtil;
@@ -53,23 +54,23 @@ import org.junit.Test;
* internal state would be corrupt, which becomes evident when toString is called.
*/
public abstract class SerializerTestBase<T> extends TestLogger {
-
+
protected abstract TypeSerializer<T> createSerializer();
/**
* Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method.
- *
+ *
* <p>The expected length should be positive, for fix-length data types, or {@code -1} for
* variable-length types.
*/
protected abstract int getLength();
-
+
protected abstract Class<T> getTypeClass();
-
+
protected abstract T[] getTestData();
// --------------------------------------------------------------------------------------------
-
+
@Test
public void testInstantiate() {
try {
@@ -80,13 +81,13 @@ public abstract class SerializerTestBase<T> extends TestLogger {
}
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", instance);
-
+
Class<T> type = getTypeClass();
assertNotNull("The test is corrupt: type class is null.", type);
if (!type.isAssignableFrom(instance.getClass())) {
fail("Type of the instantiated object is wrong. " +
- "Expected Type: " + type + " present type " + instance.getClass());
+ "Expected Type: " + type + " present type " + instance.getClass());
}
}
catch (Exception e) {
@@ -127,7 +128,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
assertTrue(strategy.isRequiresMigration());
}
-
+
@Test
public void testGetLength() {
final int len = getLength();
@@ -146,16 +147,16 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testCopy() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
for (T datum : testData) {
T copy = serializer.copy(datum);
- copy.toString();
+ checkToString(copy);
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
@@ -165,16 +166,16 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testCopyIntoNewElements() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
for (T datum : testData) {
T copy = serializer.copy(datum, serializer.createInstance());
- copy.toString();
+ checkToString(copy);
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
@@ -184,18 +185,18 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testCopyIntoReusedElements() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
T target = serializer.createInstance();
-
+
for (T datum : testData) {
T copy = serializer.copy(datum, target);
- copy.toString();
+ checkToString(copy);
deepEquals("Copied element is not equal to the original element.", datum, copy);
target = copy;
}
@@ -206,25 +207,25 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testSerializeIndividually() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
for (T value : testData) {
TestOutputView out = new TestOutputView();
serializer.serialize(value, out);
TestInputView in = out.getInputView();
-
+
assertTrue("No data available during deserialization.", in.available() > 0);
-
+
T deserialized = serializer.deserialize(serializer.createInstance(), in);
- deserialized.toString();
+ checkToString(deserialized);
deepEquals("Deserialized value if wrong.", value, deserialized);
-
+
assertTrue("Trailing data available after deserialization.", in.available() == 0);
}
}
@@ -241,23 +242,23 @@ public abstract class SerializerTestBase<T> extends TestLogger {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
T reuseValue = serializer.createInstance();
-
+
for (T value : testData) {
TestOutputView out = new TestOutputView();
serializer.serialize(value, out);
TestInputView in = out.getInputView();
-
+
assertTrue("No data available during deserialization.", in.available() > 0);
-
+
T deserialized = serializer.deserialize(reuseValue, in);
- deserialized.toString();
+ checkToString(deserialized);
deepEquals("Deserialized value if wrong.", value, deserialized);
-
+
assertTrue("Trailing data available after deserialization.", in.available() == 0);
-
+
reuseValue = deserialized;
}
}
@@ -267,29 +268,29 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testSerializeAsSequenceNoReuse() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
-
+
TestInputView in = out.getInputView();
-
+
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(in);
- deserialized.toString();
+ checkToString(deserialized);
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
-
+
assertEquals("Wrong number of elements deserialized.", testData.length, num);
}
catch (Exception e) {
@@ -298,31 +299,31 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testSerializeAsSequenceReusingValues() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
-
+
TestInputView in = out.getInputView();
T reuseValue = serializer.createInstance();
-
+
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(reuseValue, in);
- deserialized.toString();
+ checkToString(deserialized);
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
reuseValue = deserialized;
num++;
}
-
+
assertEquals("Wrong number of elements deserialized.", testData.length, num);
}
catch (Exception e) {
@@ -331,30 +332,30 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testSerializedCopyIndividually() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
for (T value : testData) {
TestOutputView out = new TestOutputView();
serializer.serialize(value, out);
-
+
TestInputView source = out.getInputView();
TestOutputView target = new TestOutputView();
serializer.copy(source, target);
-
+
TestInputView toVerify = target.getInputView();
-
+
assertTrue("No data available copying.", toVerify.available() > 0);
-
+
T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
- deserialized.toString();
+ checkToString(deserialized);
deepEquals("Deserialized value if wrong.", value, deserialized);
-
+
assertTrue("Trailing data available after deserialization.", toVerify.available() == 0);
}
}
@@ -364,36 +365,36 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
-
+
+
@Test
public void testSerializedCopyAsSequence() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
-
+
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
-
+
TestInputView source = out.getInputView();
TestOutputView target = new TestOutputView();
for (int i = 0; i < testData.length; i++) {
serializer.copy(source, target);
}
-
+
TestInputView toVerify = target.getInputView();
int num = 0;
-
+
while (toVerify.available() > 0) {
T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
- deserialized.toString();
+ checkToString(deserialized);
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
-
+
assertEquals("Wrong number of elements copied.", testData.length, num);
}
catch (Exception e) {
@@ -402,7 +403,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
@Test
public void testSerializabilityAndEquals() {
try {
@@ -414,7 +415,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("The serializer is not serializable: " + e);
return;
}
-
+
assertEquals("The copy of the serializer is not equal to the original one.", ser1, ser2);
}
catch (Exception e) {
@@ -423,10 +424,26 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
-
+
+ @Test
+ public void testNullability() {
+ TypeSerializer<T> serializer = getSerializer();
+ try {
+ NullableSerializer.checkIfNullSupported(serializer);
+ } catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ fail("Unexpected failure of null value handling: " + t.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
-
+
protected void deepEquals(String message, T should, T is) {
+ Assert.assertTrue((should == null && is == null) || (should != null && is != null));
+ if (should == null) {
+ return;
+ }
if (should.getClass().isArray()) {
if (should instanceof boolean[]) {
Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is));
@@ -463,9 +480,9 @@ public abstract class SerializerTestBase<T> extends TestLogger {
assertEquals(message, should, is);
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
protected TypeSerializer<T> getSerializer() {
TypeSerializer<T> serializer = createSerializer();
if (serializer == null) {
@@ -473,7 +490,7 @@ public abstract class SerializerTestBase<T> extends TestLogger {
}
return serializer;
}
-
+
private T[] getData() {
T[] data = getTestData();
if (data == null) {
@@ -481,15 +498,15 @@ public abstract class SerializerTestBase<T> extends TestLogger {
}
return data;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
private static final class TestOutputView extends DataOutputStream implements DataOutputView {
-
+
public TestOutputView() {
super(new ByteArrayOutputStream(4096));
}
-
+
public TestInputView getInputView() {
ByteArrayOutputStream baos = (ByteArrayOutputStream) out;
return new TestInputView(baos.toByteArray());
@@ -509,8 +526,8 @@ public abstract class SerializerTestBase<T> extends TestLogger {
write(buffer);
}
}
-
-
+
+
private static final class TestInputView extends DataInputStream implements DataInputView {
public TestInputView(byte[] data) {
@@ -542,4 +559,10 @@ public abstract class SerializerTestBase<T> extends TestLogger {
return getClass().hashCode();
}
}
+
+ private static <T> void checkToString(T value) {
+ if (value != null) {
+ value.toString();
+ }
+ }
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
new file mode 100644
index 0000000..3bd176a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Unit tests for {@link NullableSerializer}. */
+@RunWith(Parameterized.class)
+public class NullableSerializerTest extends SerializerTestBase<Integer> {
+ private static final TypeSerializer<Integer> originalSerializer = IntSerializer.INSTANCE;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static List<Boolean> whetherToPadNullValue() {
+ return Arrays.asList(true, false);
+ }
+
+ @Parameterized.Parameter
+ public boolean padNullValue;
+
+ private TypeSerializer<Integer> nullableSerializer;
+
+ @Before
+ public void init() {
+ nullableSerializer = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue);
+ }
+
+ @Override
+ protected TypeSerializer<Integer> createSerializer() {
+ return NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue);
+ }
+
+ @Override
+ protected int getLength() {
+ return padNullValue ? 5 : -1;
+ }
+
+ @Override
+ protected Class<Integer> getTypeClass() {
+ return Integer.class;
+ }
+
+ @Override
+ protected Integer[] getTestData() {
+ return new Integer[] { 5, -1, 0, null };
+ }
+
+ @Test
+ public void testWrappingNotNeeded() {
+ assertEquals(NullableSerializer.wrapIfNullIsNotSupported(StringSerializer.INSTANCE, padNullValue), StringSerializer.INSTANCE);
+ }
+
+ @Test
+ public void testWrappingNeeded() {
+ assertTrue(nullableSerializer instanceof NullableSerializer);
+ assertEquals(NullableSerializer.wrapIfNullIsNotSupported(nullableSerializer, padNullValue), nullableSerializer);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 268f84a..3b0a99f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -81,6 +81,14 @@ abstract class AbstractTtlDecorator<T> {
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
+ TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
+ return ttlValue == null ? null : ttlValue.getUserValue();
+ }
+
+ <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
+ SupplierWithException<TtlValue<V>, SE> getter,
+ ThrowingConsumer<TtlValue<V>, CE> updater,
+ ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
@@ -92,6 +100,6 @@ abstract class AbstractTtlDecorator<T> {
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
- return ttlValue.getUserValue();
+ return ttlValue;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 160dbeb..f84a2ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -54,7 +54,13 @@ class TtlMapState<K, N, UK, UV>
@Override
public UV get(UK key) throws Exception {
- return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
+ TtlValue<UV> ttlValue = getWrapped(key);
+ return ttlValue == null ? null : ttlValue.getUserValue();
+ }
+
+ private TtlValue<UV> getWrapped(UK key) throws Exception {
+ return getWrappedWithTtlCheckAndUpdate(
+ () -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
}
@Override
@@ -83,7 +89,8 @@ class TtlMapState<K, N, UK, UV>
@Override
public boolean contains(UK key) throws Exception {
- return get(key) != null;
+ TtlValue<UV> ttlValue = getWrapped(key);
+ return ttlValue != null;
}
@Override
@@ -161,16 +168,16 @@ class TtlMapState<K, N, UK, UV>
}
private Map.Entry<UK, UV> getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
- UV unexpiredValue;
+ TtlValue<UV> unexpiredValue;
try {
- unexpiredValue = getWithTtlCheckAndUpdate(
+ unexpiredValue = getWrappedWithTtlCheckAndUpdate(
e::getValue,
v -> original.put(e.getKey(), v),
originalIterator::remove);
} catch (Exception ex) {
throw new FlinkRuntimeException(ex);
}
- return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue);
+ return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue.getUserValue());
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index 773fe7c..be231c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -44,6 +44,6 @@ class TtlUtils {
}
static <V> TtlValue<V> wrapWithTs(V value, long ts) {
- return value == null ? null : new TtlValue<>(value, ts);
+ return new TtlValue<>(value, ts);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index a8bcadf..48435d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.state.ttl;
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.io.Serializable;
@@ -30,15 +30,16 @@ import java.io.Serializable;
class TtlValue<T> implements Serializable {
private static final long serialVersionUID = 5221129704201125020L;
+ @Nullable
private final T userValue;
private final long lastAccessTimestamp;
- TtlValue(T userValue, long lastAccessTimestamp) {
- Preconditions.checkNotNull(userValue);
+ TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
this.userValue = userValue;
this.lastAccessTimestamp = lastAccessTimestamp;
}
+ @Nullable
T getUserValue() {
return userValue;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
index 7fd61aa..7294b4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -35,9 +36,9 @@ class TtlMapStateAllEntriesTestContext extends
void initTestValues() {
emptyValue = Collections.emptySet();
- updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
- updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
- updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+ updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(23, null), Tuple2.of(10, "10"));
+ updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(24, null), Tuple2.of(7, "7"));
+ updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(25, null), Tuple2.of(4, "4"));
getUpdateEmpty = updateEmpty.entrySet();
getUnexpired = updateUnexpired.entrySet();
@@ -46,7 +47,9 @@ class TtlMapStateAllEntriesTestContext extends
@SafeVarargs
private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
- return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ Map<UK, UV> map = new HashMap<>();
+ Arrays.stream(entries).forEach(t -> map.put(t.f0, t.f1));
+ return map;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
index fb025af..a77c8ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -43,7 +43,10 @@ class TtlMapStatePerElementTestContext extends TtlMapStateTestContext<String, St
@Override
String get() throws Exception {
- return ttlState.get(TEST_KEY);
+ String value = ttlState.get(TEST_KEY);
+ assert (getOriginal() == null && !ttlState.contains(TEST_KEY)) ||
+ (getOriginal() != null && ttlState.contains(TEST_KEY));
+ return value;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
index a8bcadf..6f8c70b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
@@ -18,32 +18,15 @@
package org.apache.flink.runtime.state.ttl;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * This class wraps user value of state with TTL.
- *
- * @param <T> Type of the user value of state with TTL
- */
-class TtlValue<T> implements Serializable {
- private static final long serialVersionUID = 5221129704201125020L;
-
- private final T userValue;
- private final long lastAccessTimestamp;
-
- TtlValue(T userValue, long lastAccessTimestamp) {
- Preconditions.checkNotNull(userValue);
- this.userValue = userValue;
- this.lastAccessTimestamp = lastAccessTimestamp;
- }
-
- T getUserValue() {
- return userValue;
- }
-
- long getLastAccessTimestamp() {
- return lastAccessTimestamp;
+class TtlMapStatePerNullElementTestContext extends TtlMapStatePerElementTestContext {
+ @Override
+ void initTestValues() {
+ updateEmpty = null;
+ updateUnexpired = null;
+ updateExpired = null;
+
+ getUpdateEmpty = null;
+ getUnexpired = null;
+ getUpdateExpired = null;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 6b3a15f..f9f108a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -69,6 +69,7 @@ public abstract class TtlStateTestBase {
new TtlListStateTestContext(),
new TtlMapStateAllEntriesTestContext(),
new TtlMapStatePerElementTestContext(),
+ new TtlMapStatePerNullElementTestContext(),
new TtlAggregatingStateTestContext(),
new TtlReducingStateTestContext(),
new TtlFoldingStateTestContext());