You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/18 16:48:43 UTC
[flink] branch master updated: [FLINK-19503][state] Add
StateChangelog API
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4c402a3 [FLINK-19503][state] Add StateChangelog API
4c402a3 is described below
commit 4c402a3ad6d5f9c5378d222f41d92d129b5cbcfa
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Oct 6 20:38:45 2020 +0200
[FLINK-19503][state] Add StateChangelog API
---
.../runtime/state/changelog/SequenceNumber.java | 83 ++++++++++++
.../flink/runtime/state/changelog/StateChange.java | 48 +++++++
.../state/changelog/StateChangelogHandle.java | 35 +++++
.../changelog/StateChangelogHandleStreamImpl.java | 149 +++++++++++++++++++++
.../state/changelog/StateChangelogWriter.java | 77 +++++++++++
.../changelog/StateChangelogWriterFactory.java | 37 +++++
.../StateChangelogWriterFactoryLoader.java | 43 ++++++
.../inmemory/InMemoryStateChangelogHandle.java | 80 +++++++++++
.../inmemory/InMemoryStateChangelogWriter.java | 100 ++++++++++++++
.../InMemoryStateChangelogWriterFactory.java | 33 +++++
...ime.state.changelog.StateChangelogWriterFactory | 16 +++
.../StateChangelogWriterFactoryLoaderTest.java | 66 +++++++++
.../inmemory/StateChangelogWriterFactoryTest.java | 135 +++++++++++++++++++
13 files changed, 902 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
new file mode 100644
index 0000000..c6556fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A logical timestamp to draw a boundary between the materialized and non-materialized changes.
+ * Maintained by the state backend but implementations may choose to move its generation to {@link
+ * StateChangelogWriterFactory} as an optimization.
+ */
+@Internal
+public interface SequenceNumber extends Comparable<SequenceNumber> {
+
+ SequenceNumber next();
+
+ /** Generic {@link SequenceNumber}. */
+ final class GenericSequenceNumber implements SequenceNumber {
+ public final long number;
+
+ GenericSequenceNumber(long number) {
+ Preconditions.checkArgument(number >= 0);
+ this.number = number;
+ }
+
+ @Override
+ public int compareTo(SequenceNumber o) {
+ Preconditions.checkArgument(o instanceof GenericSequenceNumber);
+ return Long.compare(this.number, ((GenericSequenceNumber) o).number);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GenericSequenceNumber)) {
+ return false;
+ }
+ return number == ((GenericSequenceNumber) o).number;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(number);
+ }
+
+ @Override
+ public SequenceNumber next() {
+ checkState(number < Long.MAX_VALUE);
+ return SequenceNumber.of(number + 1);
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(number);
+ }
+ }
+
+ static SequenceNumber of(long number) {
+ return new GenericSequenceNumber(number);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
new file mode 100644
index 0000000..7ae60f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/** Change of state of a keyed operator. Used for generic incremental checkpoints. */
+@Internal
+public class StateChange {
+
+ private final int keyGroup;
+ private final byte[] change;
+
+ public StateChange(int keyGroup, byte[] change) {
+ Preconditions.checkArgument(keyGroup >= 0);
+ this.keyGroup = keyGroup;
+ this.change = Preconditions.checkNotNull(change);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("keyGroup=%d, dataSize=%d", keyGroup, change.length);
+ }
+
+ public int getKeyGroup() {
+ return keyGroup;
+ }
+
+ public byte[] getChange() {
+ return change;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
new file mode 100644
index 0000000..a4a8656
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.IOException;
+
+/**
+ * A handle to saved {@link StateChange state changes}.
+ *
+ * @param <ReaderContext> type of context used while reading (on TM).
+ */
+@Internal
+public interface StateChangelogHandle<ReaderContext> extends KeyedStateHandle {
+
+ CloseableIterator<StateChange> getChanges(ReaderContext context) throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
new file mode 100644
index 0000000..a8ab78f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** {@link StateChangelogHandle} implementation based on {@link StreamStateHandle}. */
+@Internal
+public final class StateChangelogHandleStreamImpl
+ implements StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
+ private static final long serialVersionUID = -8070326169926626355L;
+
+ private final KeyGroupRange keyGroupRange;
+ /** NOTE: order is important as it reflects the order of changes. */
+ private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
+
+ private transient SharedStateRegistry stateRegistry;
+
+ public StateChangelogHandleStreamImpl(
+ List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, KeyGroupRange keyGroupRange) {
+ this.handlesAndOffsets = handlesAndOffsets;
+ this.keyGroupRange = keyGroupRange;
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+ this.stateRegistry = stateRegistry;
+ handlesAndOffsets.forEach(
+ handleAndOffset ->
+ stateRegistry.registerReference(
+ getKey(handleAndOffset.f0), handleAndOffset.f0));
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
+
+ @Nullable
+ @Override
+ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ KeyGroupRange offsets = keyGroupRange.getIntersection(keyGroupRange);
+ if (offsets.getNumberOfKeyGroups() == 0) {
+ return null;
+ }
+ return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets);
+ }
+
+ @Override
+ public CloseableIterator<StateChange> getChanges(StateChangeStreamReader reader) {
+ return new CloseableIterator<StateChange>() {
+ private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator =
+ handlesAndOffsets.iterator();
+
+ private CloseableIterator<StateChange> current = CloseableIterator.empty();
+
+ @Override
+ public boolean hasNext() {
+ advance();
+ return current.hasNext();
+ }
+
+ @Override
+ public StateChange next() {
+ advance();
+ return current.next();
+ }
+
+ private void advance() {
+ while (!current.hasNext() && handleIterator.hasNext()) {
+ Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
+ try {
+ current = reader.read(tuple2.f0, tuple2.f1);
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ current.close();
+ }
+ };
+ }
+
+ @Override
+ public void discardState() {
+ handlesAndOffsets.forEach(
+ handleAndOffset -> stateRegistry.unregisterReference(getKey(handleAndOffset.f0)));
+ }
+
+ @Override
+ public long getStateSize() {
+ return 0;
+ }
+
+ private static SharedStateRegistryKey getKey(StreamStateHandle stateHandle) {
+ // StateHandle key used in SharedStateRegistry should only be based on the file name
+ // and not on backend UUID or keygroup (multiple handles can refer to the same file and
+ // making keys unique will effectively disable sharing)
+ if (stateHandle instanceof FileStateHandle) {
+ return new SharedStateRegistryKey(
+ ((FileStateHandle) stateHandle).getFilePath().toString());
+ } else if (stateHandle instanceof ByteStreamStateHandle) {
+ return new SharedStateRegistryKey(
+ ((ByteStreamStateHandle) stateHandle).getHandleName());
+ } else {
+ return new SharedStateRegistryKey(
+ Integer.toString(System.identityHashCode(stateHandle)));
+ }
+ }
+
+ public interface StateChangeStreamReader {
+ CloseableIterator<StateChange> read(StreamStateHandle handle, long offset)
+ throws IOException;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
new file mode 100644
index 0000000..a206d3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Allows to write data to the log. Scoped to a single writer (e.g. state backend). */
+@Internal
+public interface StateChangelogWriter<Handle extends StateChangelogHandle<?>>
+ extends AutoCloseable {
+
+ /**
+ * Get {@link SequenceNumber} of the last element added by {@link #append(int, byte[]) append}.
+ */
+ SequenceNumber lastAppendedSequenceNumber();
+
+ /** Appends the provided data to this log. No persistency guarantees. */
+ void append(int keyGroup, byte[] value);
+
+ /**
+ * Durably persist previously {@link #append(int, byte[]) appended} data starting from the
+ * provided {@link SequenceNumber} and up to the latest change added. After this call, one of
+ * {@link #confirm(SequenceNumber, SequenceNumber) confirm}, {@link #reset(SequenceNumber,
+ * SequenceNumber) reset}, or {@link #truncate(SequenceNumber) truncate} eventually must be
+ * called for the corresponding change set. with reset/truncate/confirm methods?
+ *
+ * @param from inclusive
+ */
+ CompletableFuture<Handle> persist(SequenceNumber from) throws IOException;
+
+ /**
+ * Truncate this state changelog to free up resources. Called upon state materialization. Any
+ * {@link #persist(SequenceNumber) persisted} state changes will be discarded unless previously
+ * {@link #confirm confirmed}.
+ *
+ * @param to exclusive
+ */
+ void truncate(SequenceNumber to);
+
+ /**
+ * Mark the given state changes as confirmed by the JM.
+ *
+ * @param from inclusive
+ * @param to exclusive
+ */
+ void confirm(SequenceNumber from, SequenceNumber to);
+
+ /**
+ * Reset the state the given state changes. Called upon abortion so that if requested later then
+ * these changes will be re-uploaded.
+ */
+ void reset(SequenceNumber from, SequenceNumber to);
+
+ /**
+ * Close this log. No new appends will be possible. Any appended but not persisted records will
+ * be lost.
+ */
+ void close();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
new file mode 100644
index 0000000..98e1479
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * {@link StateChangelogWriter} factory. Scoped to a single entity (e.g. a SubTask or
+ * OperatorCoordinator). Please use {@link StateChangelogWriterFactoryLoader} to obtain an instance.
+ */
+@Internal
+public interface StateChangelogWriterFactory<Handle extends StateChangelogHandle<?>>
+ extends AutoCloseable {
+
+ StateChangelogWriter<Handle> createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
+
+ @Override
+ default void close() throws Exception {}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
new file mode 100644
index 0000000..8e0bffa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.plugin.PluginManager;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat;
+
+/** A thin wrapper around {@link PluginManager} to load {@link StateChangelogWriterFactory}. */
+@Internal
+public class StateChangelogWriterFactoryLoader {
+ private final PluginManager pluginManager;
+
+ public StateChangelogWriterFactoryLoader(PluginManager pluginManager) {
+ this.pluginManager = pluginManager;
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ public Iterator<StateChangelogWriterFactory> load() {
+ return concat(
+ pluginManager.load(StateChangelogWriterFactory.class),
+ ServiceLoader.load(StateChangelogWriterFactory.class).iterator());
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
new file mode 100644
index 0000000..e43a2ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+
+class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<Integer, List<byte[]>> changes;
+
+ public InMemoryStateChangelogHandle(Map<Integer, List<byte[]>> changes) {
+ this.changes = changes;
+ }
+
+ @Override
+ public void discardState() {}
+
+ @Override
+ public long getStateSize() {
+ return 0;
+ }
+
+ @Override
+ public CloseableIterator<StateChange> getChanges(Void unused) {
+ return CloseableIterator.fromList(
+ changes.entrySet().stream().flatMap(this::mapEntryToChangeStream).collect(toList()),
+ change -> {});
+ }
+
+ private Stream<StateChange> mapEntryToChangeStream(Map.Entry<Integer, List<byte[]>> entry) {
+ int keyGroup = entry.getKey();
+ return entry.getValue().stream().map(bytes -> new StateChange(keyGroup, bytes));
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
new file mode 100644
index 0000000..5ab68e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toMap;
+
+@NotThreadSafe
+class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
+
+ private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup =
+ new HashMap<>();
+ private long sqn = 0L;
+ private boolean closed;
+
+ @Override
+ public void append(int keyGroup, byte[] value) {
+ Preconditions.checkState(!closed, "LogWriter is closed");
+ LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length);
+ changesByKeyGroup
+ .computeIfAbsent(keyGroup, unused -> new TreeMap<>())
+ .put(SequenceNumber.of(++sqn), value);
+ }
+
+ @Override
+ public SequenceNumber lastAppendedSequenceNumber() {
+ return SequenceNumber.of(sqn);
+ }
+
+ @Override
+ public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) {
+ LOG.debug("Persist after {}", from);
+ Preconditions.checkNotNull(from);
+ return completedFuture(new InMemoryStateChangelogHandle(collectChanges(from)));
+ }
+
+ private Map<Integer, List<byte[]>> collectChanges(SequenceNumber after) {
+ return changesByKeyGroup.entrySet().stream()
+ .collect(
+ toMap(
+ Map.Entry::getKey,
+ kv ->
+ new ArrayList<>(
+ kv.getValue().tailMap(after, true).values())));
+ }
+
+ @Override
+ public void close() {
+ Preconditions.checkState(!closed);
+ closed = true;
+ }
+
+ @Override
+ public void truncate(SequenceNumber before) {
+ changesByKeyGroup.forEach((k, v) -> {});
+ }
+
+ @Override
+ public void confirm(SequenceNumber from, SequenceNumber to) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset(SequenceNumber from, SequenceNumber to) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
new file mode 100644
index 0000000..7196545
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+
+/** An in-memory (non-production) implementation of {@link StateChangelogWriterFactory}. */
+public class InMemoryStateChangelogWriterFactory
+ implements StateChangelogWriterFactory<InMemoryStateChangelogHandle> {
+
+ @Override
+ public InMemoryStateChangelogWriter createWriter(
+ OperatorID operatorID, KeyGroupRange keyGroupRange) {
+ return new InMemoryStateChangelogWriter();
+ }
+}
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
new file mode 100644
index 0000000..bf1a4d0
--- /dev/null
+++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriterFactory
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
new file mode 100644
index 0000000..88d6f03
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactoryLoader;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableList.copyOf;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertTrue;
+
+public class StateChangelogWriterFactoryLoaderTest {
+
+ @Test
+ public void testLoadSpiImplementation() {
+ assertTrue(
+ new StateChangelogWriterFactoryLoader(getPluginManager(emptyIterator()))
+ .load()
+ .hasNext());
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testLoadPluginImplementation() {
+ StateChangelogWriterFactory<?> impl = new InMemoryStateChangelogWriterFactory();
+ PluginManager pluginManager = getPluginManager(singletonList(impl).iterator());
+ Iterator<StateChangelogWriterFactory> loaded =
+ new StateChangelogWriterFactoryLoader(pluginManager).load();
+ assertTrue(copyOf(loaded).contains(impl));
+ }
+
+ private PluginManager getPluginManager(
+ Iterator<? extends StateChangelogWriterFactory<?>> iterator) {
+ return new PluginManager() {
+
+ @Override
+ public <P> Iterator<P> load(Class<P> service) {
+ checkArgument(service.equals(StateChangelogWriterFactory.class));
+ //noinspection unchecked
+ return (Iterator<P>) iterator;
+ }
+ };
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
new file mode 100644
index 0000000..8887525
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.StreamSupport.stream;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/** {@link InMemoryStateChangelogWriterFactory} test. */
+public class StateChangelogWriterFactoryTest {
+
+ private final Random random = new Random();
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test(expected = IllegalStateException.class)
+ public void testNoAppendAfterClose() {
+ StateChangelogWriter<?> writer =
+ getFactory().createWriter(new OperatorID(), KeyGroupRange.of(0, 0));
+ writer.close();
+ writer.append(0, new byte[0]);
+ }
+
+ @Test
+ public void testWriteAndRead() throws Exception {
+ KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
+ Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 10, 20);
+
+ try (StateChangelogWriterFactory<?> client = getFactory();
+ StateChangelogWriter<?> writer = client.createWriter(new OperatorID(), kgRange)) {
+ SequenceNumber prev = writer.lastAppendedSequenceNumber();
+ appendsByKeyGroup.forEach(
+ (group, appends) -> appends.forEach(bytes -> writer.append(group, bytes)));
+
+ StateChangelogHandle<?> handle = writer.persist(prev.next()).get();
+
+ assertByteMapsEqual(appendsByKeyGroup, extract(handle));
+ }
+ }
+
+ private void assertByteMapsEqual(
+ Map<Integer, List<byte[]>> expected, Map<Integer, List<byte[]>> actual) {
+ assertEquals(expected.size(), actual.size());
+ for (Map.Entry<Integer, List<byte[]>> e : expected.entrySet()) {
+ List<byte[]> expectedList = e.getValue();
+ List<byte[]> actualList = actual.get(e.getKey());
+ Iterator<byte[]> ite = expectedList.iterator(), ale = actualList.iterator();
+ while (ite.hasNext() && ale.hasNext()) {
+ assertArrayEquals(ite.next(), ale.next());
+ }
+ assertFalse(ite.hasNext());
+ assertFalse(ale.hasNext());
+ }
+ }
+
+ private Map<Integer, List<byte[]>> extract(StateChangelogHandle<?> handle) throws Exception {
+ Map<Integer, List<byte[]>> changes = new HashMap<>();
+ //noinspection unchecked
+ StateChangelogHandle<Object> objHandle = (StateChangelogHandle<Object>) handle;
+ try (CloseableIterator<StateChange> it = objHandle.getChanges(getContext())) {
+ while (it.hasNext()) {
+ StateChange change = it.next();
+ changes.computeIfAbsent(change.getKeyGroup(), k -> new ArrayList<>())
+ .add(change.getChange());
+ }
+ }
+ return changes;
+ }
+
+ private Map<Integer, List<byte[]>> generateAppends(
+ KeyGroupRange kgRange, int keyLen, int appendsPerGroup) {
+ return stream(kgRange.spliterator(), false)
+ .collect(toMap(identity(), unused -> generateData(appendsPerGroup, keyLen)));
+ }
+
+ private List<byte[]> generateData(int numAppends, int keyLen) {
+ return Stream.generate(() -> randomBytes(keyLen))
+ .limit(numAppends)
+ .collect(Collectors.toList());
+ }
+
+ private byte[] randomBytes(int len) {
+ byte[] bytes = new byte[len];
+ random.nextBytes(bytes);
+ return bytes;
+ }
+
+ private InMemoryStateChangelogWriterFactory getFactory() {
+ return new InMemoryStateChangelogWriterFactory();
+ }
+
+ private Object getContext() {
+ return null;
+ }
+}