You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/18 16:48:43 UTC

[flink] branch master updated: [FLINK-19503][state] Add StateChangelog API

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c402a3  [FLINK-19503][state] Add StateChangelog API
4c402a3 is described below

commit 4c402a3ad6d5f9c5378d222f41d92d129b5cbcfa
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Oct 6 20:38:45 2020 +0200

    [FLINK-19503][state] Add StateChangelog API
---
 .../runtime/state/changelog/SequenceNumber.java    |  83 ++++++++++++
 .../flink/runtime/state/changelog/StateChange.java |  48 +++++++
 .../state/changelog/StateChangelogHandle.java      |  35 +++++
 .../changelog/StateChangelogHandleStreamImpl.java  | 149 +++++++++++++++++++++
 .../state/changelog/StateChangelogWriter.java      |  77 +++++++++++
 .../changelog/StateChangelogWriterFactory.java     |  37 +++++
 .../StateChangelogWriterFactoryLoader.java         |  43 ++++++
 .../inmemory/InMemoryStateChangelogHandle.java     |  80 +++++++++++
 .../inmemory/InMemoryStateChangelogWriter.java     | 100 ++++++++++++++
 .../InMemoryStateChangelogWriterFactory.java       |  33 +++++
 ...ime.state.changelog.StateChangelogWriterFactory |  16 +++
 .../StateChangelogWriterFactoryLoaderTest.java     |  66 +++++++++
 .../inmemory/StateChangelogWriterFactoryTest.java  | 135 +++++++++++++++++++
 13 files changed, 902 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
new file mode 100644
index 0000000..c6556fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/SequenceNumber.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A logical timestamp to draw a boundary between the materialized and non-materialized changes.
+ * Maintained by the state backend but implementations may choose to move its generation to {@link
+ * StateChangelogWriterFactory} as an optimization.
+ */
+@Internal
+public interface SequenceNumber extends Comparable<SequenceNumber> {
+
+    SequenceNumber next();
+
+    /** Generic {@link SequenceNumber}. */
+    final class GenericSequenceNumber implements SequenceNumber {
+        public final long number;
+
+        GenericSequenceNumber(long number) {
+            Preconditions.checkArgument(number >= 0);
+            this.number = number;
+        }
+
+        @Override
+        public int compareTo(SequenceNumber o) {
+            Preconditions.checkArgument(o instanceof GenericSequenceNumber);
+            return Long.compare(this.number, ((GenericSequenceNumber) o).number);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof GenericSequenceNumber)) {
+                return false;
+            }
+            return number == ((GenericSequenceNumber) o).number;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(number);
+        }
+
+        @Override
+        public SequenceNumber next() {
+            checkState(number < Long.MAX_VALUE);
+            return SequenceNumber.of(number + 1);
+        }
+
+        @Override
+        public String toString() {
+            return Long.toString(number);
+        }
+    }
+
+    static SequenceNumber of(long number) {
+        return new GenericSequenceNumber(number);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
new file mode 100644
index 0000000..7ae60f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+/** Change of state of a keyed operator. Used for generic incremental checkpoints. */
+@Internal
+public class StateChange {
+
+    private final int keyGroup;
+    private final byte[] change;
+
+    public StateChange(int keyGroup, byte[] change) {
+        Preconditions.checkArgument(keyGroup >= 0);
+        this.keyGroup = keyGroup;
+        this.change = Preconditions.checkNotNull(change);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("keyGroup=%d, dataSize=%d", keyGroup, change.length);
+    }
+
+    public int getKeyGroup() {
+        return keyGroup;
+    }
+
+    public byte[] getChange() {
+        return change;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
new file mode 100644
index 0000000..a4a8656
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.IOException;
+
+/**
+ * A handle to saved {@link StateChange state changes}.
+ *
+ * @param <ReaderContext> type of context used while reading (on TM).
+ */
+@Internal
+public interface StateChangelogHandle<ReaderContext> extends KeyedStateHandle {
+
+    CloseableIterator<StateChange> getChanges(ReaderContext context) throws IOException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
new file mode 100644
index 0000000..a8ab78f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** {@link StateChangelogHandle} implementation based on {@link StreamStateHandle}. */
+@Internal
+public final class StateChangelogHandleStreamImpl
+        implements StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
+    private static final long serialVersionUID = -8070326169926626355L;
+
+    private final KeyGroupRange keyGroupRange;
+    /** NOTE: order is important as it reflects the order of changes. */
+    private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
+
+    private transient SharedStateRegistry stateRegistry;
+
+    public StateChangelogHandleStreamImpl(
+            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, KeyGroupRange keyGroupRange) {
+        this.handlesAndOffsets = handlesAndOffsets;
+        this.keyGroupRange = keyGroupRange;
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry) {
+        this.stateRegistry = stateRegistry;
+        handlesAndOffsets.forEach(
+                handleAndOffset ->
+                        stateRegistry.registerReference(
+                                getKey(handleAndOffset.f0), handleAndOffset.f0));
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyGroupRange;
+    }
+
+    @Nullable
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        KeyGroupRange offsets = keyGroupRange.getIntersection(keyGroupRange);
+        if (offsets.getNumberOfKeyGroups() == 0) {
+            return null;
+        }
+        return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets);
+    }
+
+    @Override
+    public CloseableIterator<StateChange> getChanges(StateChangeStreamReader reader) {
+        return new CloseableIterator<StateChange>() {
+            private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator =
+                    handlesAndOffsets.iterator();
+
+            private CloseableIterator<StateChange> current = CloseableIterator.empty();
+
+            @Override
+            public boolean hasNext() {
+                advance();
+                return current.hasNext();
+            }
+
+            @Override
+            public StateChange next() {
+                advance();
+                return current.next();
+            }
+
+            private void advance() {
+                while (!current.hasNext() && handleIterator.hasNext()) {
+                    Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next();
+                    try {
+                        current = reader.read(tuple2.f0, tuple2.f1);
+                    } catch (IOException e) {
+                        ExceptionUtils.rethrow(e);
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws Exception {
+                current.close();
+            }
+        };
+    }
+
+    @Override
+    public void discardState() {
+        handlesAndOffsets.forEach(
+                handleAndOffset -> stateRegistry.unregisterReference(getKey(handleAndOffset.f0)));
+    }
+
+    @Override
+    public long getStateSize() {
+        return 0;
+    }
+
+    private static SharedStateRegistryKey getKey(StreamStateHandle stateHandle) {
+        // StateHandle key used in SharedStateRegistry should only be based on the file name
+        // and not on backend UUID or keygroup (multiple handles can refer to the same file and
+        // making keys unique will effectively disable sharing)
+        if (stateHandle instanceof FileStateHandle) {
+            return new SharedStateRegistryKey(
+                    ((FileStateHandle) stateHandle).getFilePath().toString());
+        } else if (stateHandle instanceof ByteStreamStateHandle) {
+            return new SharedStateRegistryKey(
+                    ((ByteStreamStateHandle) stateHandle).getHandleName());
+        } else {
+            return new SharedStateRegistryKey(
+                    Integer.toString(System.identityHashCode(stateHandle)));
+        }
+    }
+
+    public interface StateChangeStreamReader {
+        CloseableIterator<StateChange> read(StreamStateHandle handle, long offset)
+                throws IOException;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
new file mode 100644
index 0000000..a206d3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Allows to write data to the log. Scoped to a single writer (e.g. state backend). */
+@Internal
+public interface StateChangelogWriter<Handle extends StateChangelogHandle<?>>
+        extends AutoCloseable {
+
+    /**
+     * Get {@link SequenceNumber} of the last element added by {@link #append(int, byte[]) append}.
+     */
+    SequenceNumber lastAppendedSequenceNumber();
+
+    /** Appends the provided data to this log. No persistency guarantees. */
+    void append(int keyGroup, byte[] value);
+
+    /**
+     * Durably persist previously {@link #append(int, byte[]) appended} data starting from the
+     * provided {@link SequenceNumber} and up to the latest change added. After this call, one of
+     * {@link #confirm(SequenceNumber, SequenceNumber) confirm}, {@link #reset(SequenceNumber,
+     * SequenceNumber) reset}, or {@link #truncate(SequenceNumber) truncate} eventually must be
+     * called for the corresponding change set. with reset/truncate/confirm methods?
+     *
+     * @param from inclusive
+     */
+    CompletableFuture<Handle> persist(SequenceNumber from) throws IOException;
+
+    /**
+     * Truncate this state changelog to free up resources. Called upon state materialization. Any
+     * {@link #persist(SequenceNumber) persisted} state changes will be discarded unless previously
+     * {@link #confirm confirmed}.
+     *
+     * @param to exclusive
+     */
+    void truncate(SequenceNumber to);
+
+    /**
+     * Mark the given state changes as confirmed by the JM.
+     *
+     * @param from inclusive
+     * @param to exclusive
+     */
+    void confirm(SequenceNumber from, SequenceNumber to);
+
+    /**
+     * Reset the state the given state changes. Called upon abortion so that if requested later then
+     * these changes will be re-uploaded.
+     */
+    void reset(SequenceNumber from, SequenceNumber to);
+
+    /**
+     * Close this log. No new appends will be possible. Any appended but not persisted records will
+     * be lost.
+     */
+    void close();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
new file mode 100644
index 0000000..98e1479
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * {@link StateChangelogWriter} factory. Scoped to a single entity (e.g. a SubTask or
+ * OperatorCoordinator). Please use {@link StateChangelogWriterFactoryLoader} to obtain an instance.
+ */
+@Internal
+public interface StateChangelogWriterFactory<Handle extends StateChangelogHandle<?>>
+        extends AutoCloseable {
+
+    StateChangelogWriter<Handle> createWriter(OperatorID operatorID, KeyGroupRange keyGroupRange);
+
+    @Override
+    default void close() throws Exception {}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
new file mode 100644
index 0000000..8e0bffa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactoryLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.plugin.PluginManager;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat;
+
+/** A thin wrapper around {@link PluginManager} to load {@link StateChangelogWriterFactory}. */
+@Internal
+public class StateChangelogWriterFactoryLoader {
+    private final PluginManager pluginManager;
+
+    public StateChangelogWriterFactoryLoader(PluginManager pluginManager) {
+        this.pluginManager = pluginManager;
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    public Iterator<StateChangelogWriterFactory> load() {
+        return concat(
+                pluginManager.load(StateChangelogWriterFactory.class),
+                ServiceLoader.load(StateChangelogWriterFactory.class).iterator());
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
new file mode 100644
index 0000000..e43a2ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+
+class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Map<Integer, List<byte[]>> changes;
+
+    public InMemoryStateChangelogHandle(Map<Integer, List<byte[]>> changes) {
+        this.changes = changes;
+    }
+
+    @Override
+    public void discardState() {}
+
+    @Override
+    public long getStateSize() {
+        return 0;
+    }
+
+    @Override
+    public CloseableIterator<StateChange> getChanges(Void unused) {
+        return CloseableIterator.fromList(
+                changes.entrySet().stream().flatMap(this::mapEntryToChangeStream).collect(toList()),
+                change -> {});
+    }
+
+    private Stream<StateChange> mapEntryToChangeStream(Map.Entry<Integer, List<byte[]>> entry) {
+        int keyGroup = entry.getKey();
+        return entry.getValue().stream().map(bytes -> new StateChange(keyGroup, bytes));
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Nullable
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
new file mode 100644
index 0000000..5ab68e7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toMap;
+
+@NotThreadSafe
+class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> {
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
+
+    private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup =
+            new HashMap<>();
+    private long sqn = 0L;
+    private boolean closed;
+
+    @Override
+    public void append(int keyGroup, byte[] value) {
+        Preconditions.checkState(!closed, "LogWriter is closed");
+        LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length);
+        changesByKeyGroup
+                .computeIfAbsent(keyGroup, unused -> new TreeMap<>())
+                .put(SequenceNumber.of(++sqn), value);
+    }
+
+    @Override
+    public SequenceNumber lastAppendedSequenceNumber() {
+        return SequenceNumber.of(sqn);
+    }
+
+    @Override
+    public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) {
+        LOG.debug("Persist after {}", from);
+        Preconditions.checkNotNull(from);
+        return completedFuture(new InMemoryStateChangelogHandle(collectChanges(from)));
+    }
+
+    private Map<Integer, List<byte[]>> collectChanges(SequenceNumber after) {
+        return changesByKeyGroup.entrySet().stream()
+                .collect(
+                        toMap(
+                                Map.Entry::getKey,
+                                kv ->
+                                        new ArrayList<>(
+                                                kv.getValue().tailMap(after, true).values())));
+    }
+
+    @Override
+    public void close() {
+        Preconditions.checkState(!closed);
+        closed = true;
+    }
+
+    @Override
+    public void truncate(SequenceNumber before) {
+        changesByKeyGroup.forEach((k, v) -> {});
+    }
+
+    @Override
+    public void confirm(SequenceNumber from, SequenceNumber to) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reset(SequenceNumber from, SequenceNumber to) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
new file mode 100644
index 0000000..7196545
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+
+/** An in-memory (non-production) implementation of {@link StateChangelogWriterFactory}. */
+public class InMemoryStateChangelogWriterFactory
+        implements StateChangelogWriterFactory<InMemoryStateChangelogHandle> {
+
+    @Override
+    public InMemoryStateChangelogWriter createWriter(
+            OperatorID operatorID, KeyGroupRange keyGroupRange) {
+        return new InMemoryStateChangelogWriter();
+    }
+}
diff --git a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
new file mode 100644
index 0000000..bf1a4d0
--- /dev/null
+++ b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogWriterFactory
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
new file mode 100644
index 0000000..88d6f03
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryLoaderTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactoryLoader;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.shaded.curator4.com.google.common.collect.ImmutableList.copyOf;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertTrue;
+
+public class StateChangelogWriterFactoryLoaderTest {
+
+    @Test
+    public void testLoadSpiImplementation() {
+        assertTrue(
+                new StateChangelogWriterFactoryLoader(getPluginManager(emptyIterator()))
+                        .load()
+                        .hasNext());
+    }
+
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void testLoadPluginImplementation() {
+        StateChangelogWriterFactory<?> impl = new InMemoryStateChangelogWriterFactory();
+        PluginManager pluginManager = getPluginManager(singletonList(impl).iterator());
+        Iterator<StateChangelogWriterFactory> loaded =
+                new StateChangelogWriterFactoryLoader(pluginManager).load();
+        assertTrue(copyOf(loaded).contains(impl));
+    }
+
+    private PluginManager getPluginManager(
+            Iterator<? extends StateChangelogWriterFactory<?>> iterator) {
+        return new PluginManager() {
+
+            @Override
+            public <P> Iterator<P> load(Class<P> service) {
+                checkArgument(service.equals(StateChangelogWriterFactory.class));
+                //noinspection unchecked
+                return (Iterator<P>) iterator;
+            }
+        };
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
new file mode 100644
index 0000000..8887525
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog.inmemory;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.StreamSupport.stream;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/** {@link InMemoryStateChangelogWriterFactory} test. */
+public class StateChangelogWriterFactoryTest {
+
+    private final Random random = new Random();
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Test(expected = IllegalStateException.class)
+    public void testNoAppendAfterClose() {
+        StateChangelogWriter<?> writer =
+                getFactory().createWriter(new OperatorID(), KeyGroupRange.of(0, 0));
+        writer.close();
+        writer.append(0, new byte[0]);
+    }
+
+    @Test
+    public void testWriteAndRead() throws Exception {
+        KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
+        Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 10, 20);
+
+        try (StateChangelogWriterFactory<?> client = getFactory();
+                StateChangelogWriter<?> writer = client.createWriter(new OperatorID(), kgRange)) {
+            SequenceNumber prev = writer.lastAppendedSequenceNumber();
+            appendsByKeyGroup.forEach(
+                    (group, appends) -> appends.forEach(bytes -> writer.append(group, bytes)));
+
+            StateChangelogHandle<?> handle = writer.persist(prev.next()).get();
+
+            assertByteMapsEqual(appendsByKeyGroup, extract(handle));
+        }
+    }
+
+    private void assertByteMapsEqual(
+            Map<Integer, List<byte[]>> expected, Map<Integer, List<byte[]>> actual) {
+        assertEquals(expected.size(), actual.size());
+        for (Map.Entry<Integer, List<byte[]>> e : expected.entrySet()) {
+            List<byte[]> expectedList = e.getValue();
+            List<byte[]> actualList = actual.get(e.getKey());
+            Iterator<byte[]> ite = expectedList.iterator(), ale = actualList.iterator();
+            while (ite.hasNext() && ale.hasNext()) {
+                assertArrayEquals(ite.next(), ale.next());
+            }
+            assertFalse(ite.hasNext());
+            assertFalse(ale.hasNext());
+        }
+    }
+
+    private Map<Integer, List<byte[]>> extract(StateChangelogHandle<?> handle) throws Exception {
+        Map<Integer, List<byte[]>> changes = new HashMap<>();
+        //noinspection unchecked
+        StateChangelogHandle<Object> objHandle = (StateChangelogHandle<Object>) handle;
+        try (CloseableIterator<StateChange> it = objHandle.getChanges(getContext())) {
+            while (it.hasNext()) {
+                StateChange change = it.next();
+                changes.computeIfAbsent(change.getKeyGroup(), k -> new ArrayList<>())
+                        .add(change.getChange());
+            }
+        }
+        return changes;
+    }
+
+    private Map<Integer, List<byte[]>> generateAppends(
+            KeyGroupRange kgRange, int keyLen, int appendsPerGroup) {
+        return stream(kgRange.spliterator(), false)
+                .collect(toMap(identity(), unused -> generateData(appendsPerGroup, keyLen)));
+    }
+
+    private List<byte[]> generateData(int numAppends, int keyLen) {
+        return Stream.generate(() -> randomBytes(keyLen))
+                .limit(numAppends)
+                .collect(Collectors.toList());
+    }
+
+    private byte[] randomBytes(int len) {
+        byte[] bytes = new byte[len];
+        random.nextBytes(bytes);
+        return bytes;
+    }
+
+    private InMemoryStateChangelogWriterFactory getFactory() {
+        return new InMemoryStateChangelogWriterFactory();
+    }
+
+    private Object getContext() {
+        return null;
+    }
+}