You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:09:05 UTC
[flink-statefun] 14/17: [FLINK-17533] Add support for multiple
concurrent checkpoints
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 12626a5b381dfebb40515d03992bdf243f7e603d
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 22:32:10 2020 +0200
[FLINK-17533] Add support for multiple concurrent checkpoints
---
.../statefun/flink/core/feedback/Checkpoints.java | 61 +++++++++
.../flink/core/feedback/FeedbackUnionOperator.java | 34 ++---
.../statefun/flink/core/logger/FeedbackLogger.java | 33 +++++
.../flink/core/logger/UnboundedFeedbackLogger.java | 5 +-
.../flink/core/feedback/CheckpointsTest.java | 143 +++++++++++++++++++++
5 files changed, 260 insertions(+), 16 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
new file mode 100644
index 0000000..8fd0322
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.feedback;
+
+import java.io.OutputStream;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.IOUtils;
+
+final class Checkpoints<T> implements AutoCloseable {
+ private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory;
+ private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap<>();
+
+ Checkpoints(Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory) {
+ this.feedbackLoggerFactory = Objects.requireNonNull(feedbackLoggerFactory);
+ }
+
+ public void startLogging(long checkpointId, OutputStream outputStream) {
+ FeedbackLogger<T> logger = feedbackLoggerFactory.get();
+ logger.startLogging(outputStream);
+ uncompletedCheckpoints.put(checkpointId, logger);
+ }
+
+ public void append(T element) {
+ for (FeedbackLogger<T> logger : uncompletedCheckpoints.values()) {
+ logger.append(element);
+ }
+ }
+
+ public void commitCheckpointsUntil(long checkpointId) {
+ SortedMap<Long, FeedbackLogger<T>> completedCheckpoints =
+ uncompletedCheckpoints.headMap(checkpointId, true);
+ completedCheckpoints.values().forEach(FeedbackLogger::commit);
+ completedCheckpoints.clear();
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeAllQuietly(uncompletedCheckpoints.values());
+ uncompletedCheckpoints.clear();
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index 402ded8..78fe10d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.statefun.flink.core.feedback;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -26,9 +27,9 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
import org.apache.flink.statefun.flink.core.logger.Loggers;
import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
+import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
@@ -43,20 +44,20 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
// -- configuration
private final FeedbackKey<T> feedbackKey;
- private final SerializablePredicate<T> isBarrierMessage;
+ private final SerializableFunction<T, OptionalLong> isBarrierMessage;
private final SerializableFunction<T, ?> keySelector;
private final long totalMemoryUsedForFeedbackCheckpointing;
private final TypeSerializer<T> elementSerializer;
// -- runtime
- private transient UnboundedFeedbackLogger<T> feedbackLogger;
+ private transient Checkpoints<T> checkpoints;
private transient boolean closedOrDisposed;
private transient MailboxExecutor mailboxExecutor;
private transient StreamRecord<T> reusable;
FeedbackUnionOperator(
FeedbackKey<T> feedbackKey,
- SerializablePredicate<T> isBarrierMessage,
+ SerializableFunction<T, OptionalLong> isBarrierMessage,
SerializableFunction<T, ?> keySelector,
long totalMemoryUsedForFeedbackCheckpointing,
TypeSerializer<T> elementSerializer,
@@ -84,11 +85,12 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
if (closedOrDisposed) {
return;
}
- if (isBarrierMessage.test(element)) {
- feedbackLogger.commit();
+ OptionalLong maybeCheckpoint = isBarrierMessage.apply(element);
+ if (maybeCheckpoint.isPresent()) {
+ checkpoints.commitCheckpointsUntil(maybeCheckpoint.getAsLong());
} else {
sendDownstream(element);
- feedbackLogger.append(element);
+ checkpoints.append(element);
}
}
@@ -105,22 +107,24 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
// Initialize the unbounded feedback logger
//
@SuppressWarnings("unchecked")
- UnboundedFeedbackLogger<T> feedbackLogger =
- (UnboundedFeedbackLogger<T>)
- Loggers.unboundedSpillableLogger(
+ UnboundedFeedbackLoggerFactory<T> feedbackLoggerFactory =
+ (UnboundedFeedbackLoggerFactory<T>)
+ Loggers.unboundedSpillableLoggerFactory(
ioManager,
maxParallelism,
totalMemoryUsedForFeedbackCheckpointing,
elementSerializer,
keySelector);
- this.feedbackLogger = feedbackLogger;
+ this.checkpoints = new Checkpoints<>(feedbackLoggerFactory::create);
+
//
// we first must reply previously check-pointed envelopes before we start
// processing any new envelopes.
//
+ UnboundedFeedbackLogger<T> logger = feedbackLoggerFactory.create();
for (KeyGroupStatePartitionStreamProvider keyedStateInput : context.getRawKeyedStateInputs()) {
- this.feedbackLogger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
+ logger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
}
//
// now we can start processing new messages. We do so by registering ourselves as a
@@ -132,7 +136,7 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
- this.feedbackLogger.startLogging(context.getRawKeyedOperatorStateOutput());
+ checkpoints.startLogging(context.getCheckpointId(), context.getRawKeyedOperatorStateOutput());
}
@Override
@@ -152,8 +156,8 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
// ------------------------------------------------------------------------------------------------------------------
private void closeInternally() {
- IOUtils.closeQuietly(feedbackLogger);
- feedbackLogger = null;
+ IOUtils.closeQuietly(checkpoints);
+ checkpoints = null;
closedOrDisposed = true;
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
new file mode 100644
index 0000000..465a717
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.logger;
+
+import java.io.OutputStream;
+
+public interface FeedbackLogger<T> extends AutoCloseable {
+
+ /** Start logging messages into the supplied output stream. */
+ void startLogging(OutputStream keyedStateCheckpointOutputStream);
+
+ /** Append a message to the currently logging logger. */
+ void append(T message);
+
+ /** Commit the currently logging logger. */
+ void commit();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index ef0360a..409f714 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
import org.apache.flink.util.IOUtils;
-public final class UnboundedFeedbackLogger<T> implements Closeable {
+public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> {
private final Supplier<KeyGroupStream<T>> supplier;
private final ToIntFunction<T> keyGroupAssigner;
private final Map<Integer, KeyGroupStream<T>> keyGroupStreams;
@@ -60,6 +60,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
this.checkpointedStreamOperations = Objects.requireNonNull(ops);
}
+ @Override
public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
this.checkpointedStreamOperations.requireKeyedStateCheckpointed(
keyedStateCheckpointOutputStream);
@@ -68,6 +69,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
checkpointedStreamOperations.acquireLease(keyedStateCheckpointOutputStream);
}
+ @Override
public void append(T message) {
if (keyedStateOutputStream == null) {
//
@@ -79,6 +81,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
keyGroup.append(message);
}
+ @Override
public void commit() {
try {
flushToKeyedStateOutputStream();
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
new file mode 100644
index 0000000..7b8dbee
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.feedback;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+public class CheckpointsTest {
+
+ @Test
+ public void usageExample() {
+ Loggers loggers = new Loggers();
+
+ Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+ checkpoints.startLogging(1, new ByteArrayOutputStream());
+ checkpoints.append("hello");
+ checkpoints.append("world");
+ checkpoints.commitCheckpointsUntil(1);
+
+ assertThat(loggers.items(0), contains("hello", "world"));
+ assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+ }
+
+ @Test
+ public void dataIsAppendedToMultipleLoggers() {
+ Loggers loggers = new Loggers();
+
+ Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+ checkpoints.startLogging(1, new ByteArrayOutputStream());
+ checkpoints.append("a");
+
+ checkpoints.startLogging(2, new ByteArrayOutputStream());
+ checkpoints.append("b");
+
+ checkpoints.commitCheckpointsUntil(1);
+ checkpoints.append("c");
+
+ checkpoints.commitCheckpointsUntil(2);
+
+ assertThat(loggers.items(0), contains("a", "b"));
+ assertThat(loggers.items(1), contains("b", "c"));
+ }
+
+ @Test
+ public void committingALaterCheckpointCommitsPreviousCheckpoints() {
+ Loggers loggers = new Loggers();
+
+ Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+ checkpoints.startLogging(1, new ByteArrayOutputStream());
+ checkpoints.startLogging(2, new ByteArrayOutputStream());
+ checkpoints.commitCheckpointsUntil(2);
+
+ assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+ assertThat(loggers.state(1), is(LoggerState.COMMITTED));
+ }
+
+ private enum LoggerState {
+ IDLE,
+ LOGGING,
+ COMMITTED,
+ CLOSED
+ }
+
+ private static final class Loggers implements Supplier<FeedbackLogger<String>> {
+ private final List<FakeLogger> loggers = new ArrayList<>();
+
+ @Override
+ public FeedbackLogger<String> get() {
+ FakeLogger logger = new FakeLogger();
+ loggers.add(logger);
+ return logger;
+ }
+
+ List<String> items(int loggerIndex) {
+ Preconditions.checkElementIndex(loggerIndex, loggers.size());
+ FakeLogger logger = loggers.get(loggerIndex);
+ return logger.items;
+ }
+
+ LoggerState state(int loggerIndex) {
+ Preconditions.checkElementIndex(loggerIndex, loggers.size());
+ FakeLogger logger = loggers.get(loggerIndex);
+ return logger.state;
+ }
+ }
+
+ private static final class FakeLogger implements FeedbackLogger<String> {
+
+ List<String> items = new ArrayList<>();
+ LoggerState state = LoggerState.IDLE;
+
+ @Override
+ public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
+ Preconditions.checkState(state == LoggerState.IDLE);
+ state = LoggerState.LOGGING;
+ }
+
+ @Override
+ public void append(String message) {
+ Preconditions.checkState(state != LoggerState.COMMITTED);
+ Preconditions.checkState(state != LoggerState.CLOSED);
+ items.add(message);
+ }
+
+ @Override
+ public void commit() {
+ Preconditions.checkState(state == LoggerState.LOGGING);
+ state = LoggerState.COMMITTED;
+ }
+
+ @Override
+ public void close() {
+ state = LoggerState.CLOSED;
+ }
+ }
+}