You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/03 13:06:28 UTC

[flink] 01/04: [FLINK-17376] Use JavaSerializer instead of getSerializableListState()

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dc6181667ac012afc0c68cfef825ec7bb30a7589
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu May 28 15:10:21 2020 +0200

    [FLINK-17376] Use JavaSerializer instead of getSerializableListState()
    
    We do this because we want to deprecate that method. We will have to get
    rid of using JavaSerialization completely soon, though.
---
 .../connectors/fs/bucketing/BucketingSink.java       |  8 +++++++-
 .../fs/bucketing/BucketingSinkMigrationTest.java     |  8 +++++++-
 .../source/ContinuousFileReaderOperator.java         |  8 +++++++-
 .../source/MessageAcknowledgingSourceBase.java       |  8 +++++++-
 .../runtime/operators/GenericWriteAheadSink.java     | 11 +++++++++--
 .../util/functions/StreamingFunctionUtils.java       | 20 ++++++++++++++++----
 6 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 78cefaf..ad598a2 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -387,8 +389,12 @@ public class BucketingSink<T>
 			this.refTruncate = reflectTruncate(fs);
 		}
 
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
 		OperatorStateStore stateStore = context.getOperatorStateStore();
-		restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+		this.restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
 
 		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 5bd0fcf..cfa68dc 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -204,7 +206,11 @@ public class BucketingSinkMigrationTest {
 		public void initializeState(FunctionInitializationContext context) throws Exception {
 			OperatorStateStore stateStore = context.getOperatorStateStore();
 
-			ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+			// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+			// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+			// directly on flink-runtime. We are doing it here because we need to maintain backwards
+			// compatibility with old state and because we will have to rework/remove this code soon.
+			ListState<State<T>> restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
 
 			if (context.isRestored()) {
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 87a028c..1b050f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -23,10 +23,12 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -243,7 +245,11 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
 
 		checkState(checkpointedState == null, "The reader state has already been initialized.");
 
-		checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
+		checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (!context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 3a2a5ca..5b99194 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +29,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -138,9 +140,13 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 		Preconditions.checkState(this.checkpointedState == null,
 			"The " + getClass().getSimpleName() + " has already been initialized.");
 
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
 		this.checkpointedState = context
 			.getOperatorStateStore()
-			.getSerializableListState("message-acknowledging-source-state");
+			.getListState(new ListStateDescriptor<>("message-acknowledging-source-state", new JavaSerializer<>()));
 
 		this.idsForCurrentCheckpoint = new HashSet<>(64);
 		this.pendingCheckpoints = new ArrayDeque<>();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4ad0fc6..889cec1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -26,6 +27,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -92,8 +94,13 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 		Preconditions.checkState(this.checkpointedState == null,
 			"The reader state has already been initialized.");
 
-		checkpointedState = context.getOperatorStateStore()
-			.getSerializableListState("pending-checkpoints");
+		// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+		// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+		// directly on flink-runtime. We are doing it here because we need to maintain backwards
+		// compatibility with old state and because we will have to rework/remove this code soon.
+		checkpointedState = context
+							.getOperatorStateStore()
+							.getListState(new ListStateDescriptor<>("pending-checkpoints", new JavaSerializer<>()));
 
 		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
 		if (context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
index 4482431..d9ea561 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -125,8 +127,13 @@ public final class StreamingFunctionUtils {
 			List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
 					snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
 
-			ListState<Serializable> listState = backend.
-					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+			// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+			// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+			// directly on flink-runtime. We are doing it here because we need to maintain backwards
+			// compatibility with old state and because we will have to rework/remove this code soon.
+			ListStateDescriptor<Serializable> listStateDescriptor =
+				new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>());
+			ListState<Serializable> listState = backend.getListState(listStateDescriptor);
 
 			listState.clear();
 
@@ -184,8 +191,13 @@ public final class StreamingFunctionUtils {
 			@SuppressWarnings("unchecked")
 			ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
 
-			ListState<Serializable> listState = context.getOperatorStateStore().
-					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+			// We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+			// we shouldn't be doing it because ideally nothing in the API modules/connector depends
+			// directly on flink-runtime. We are doing it here because we need to maintain backwards
+			// compatibility with old state and because we will have to rework/remove this code soon.
+			ListStateDescriptor<Serializable> listStateDescriptor =
+				new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>());
+			ListState<Serializable> listState = context.getOperatorStateStore().getListState(listStateDescriptor);
 
 			List<Serializable> list = new ArrayList<>();