You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/03 13:06:28 UTC
[flink] 01/04: [FLINK-17376] Use JavaSerializer instead of
getSerializableListState()
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit dc6181667ac012afc0c68cfef825ec7bb30a7589
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu May 28 15:10:21 2020 +0200
[FLINK-17376] Use JavaSerializer instead of getSerializableListState()
We do this because we want to deprecate that method. We will have to get
rid of using JavaSerialization completely soon, though.
---
.../connectors/fs/bucketing/BucketingSink.java | 8 +++++++-
.../fs/bucketing/BucketingSinkMigrationTest.java | 8 +++++++-
.../source/ContinuousFileReaderOperator.java | 8 +++++++-
.../source/MessageAcknowledgingSourceBase.java | 8 +++++++-
.../runtime/operators/GenericWriteAheadSink.java | 11 +++++++++--
.../util/functions/StreamingFunctionUtils.java | 20 ++++++++++++++++----
6 files changed, 53 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 78cefaf..ad598a2 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -387,8 +389,12 @@ public class BucketingSink<T>
this.refTruncate = reflectTruncate(fs);
}
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
OperatorStateStore stateStore = context.getOperatorStateStore();
- restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+ this.restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 5bd0fcf..cfa68dc 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -19,10 +19,12 @@
package org.apache.flink.streaming.connectors.fs.bucketing;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -204,7 +206,11 @@ public class BucketingSinkMigrationTest {
public void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
- ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states");
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
+ ListState<State<T>> restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
if (context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 87a028c..1b050f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -23,10 +23,12 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -243,7 +245,11 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit>
checkState(checkpointedState == null, "The reader state has already been initialized.");
- checkpointedState = context.getOperatorStateStore().getSerializableListState("splits");
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
+ checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (!context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 3a2a5ca..5b99194 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +29,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Preconditions;
@@ -138,9 +140,13 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
Preconditions.checkState(this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
this.checkpointedState = context
.getOperatorStateStore()
- .getSerializableListState("message-acknowledging-source-state");
+ .getListState(new ListStateDescriptor<>("message-acknowledging-source-state", new JavaSerializer<>()));
this.idsForCurrentCheckpoint = new HashSet<>(64);
this.pendingCheckpoints = new ArrayDeque<>();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4ad0fc6..889cec1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -26,6 +27,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -92,8 +94,13 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
Preconditions.checkState(this.checkpointedState == null,
"The reader state has already been initialized.");
- checkpointedState = context.getOperatorStateStore()
- .getSerializableListState("pending-checkpoints");
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
+ checkpointedState = context
+ .getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("pending-checkpoints", new JavaSerializer<>()));
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
if (context.isRestored()) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
index 4482431..d9ea561 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -125,8 +127,13 @@ public final class StreamingFunctionUtils {
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
- ListState<Serializable> listState = backend.
- getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
+ ListStateDescriptor<Serializable> listStateDescriptor =
+ new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>());
+ ListState<Serializable> listState = backend.getListState(listStateDescriptor);
listState.clear();
@@ -184,8 +191,13 @@ public final class StreamingFunctionUtils {
@SuppressWarnings("unchecked")
ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
- ListState<Serializable> listState = context.getOperatorStateStore().
- getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+ // We are using JavaSerializer from the flink-runtime module here. This is very naughty and
+ // we shouldn't be doing it because ideally nothing in the API modules/connector depends
+ // directly on flink-runtime. We are doing it here because we need to maintain backwards
+ // compatibility with old state and because we will have to rework/remove this code soon.
+ ListStateDescriptor<Serializable> listStateDescriptor =
+ new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>());
+ ListState<Serializable> listState = context.getOperatorStateStore().getListState(listStateDescriptor);
List<Serializable> list = new ArrayList<>();