You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/15 11:09:05 UTC

[flink-statefun] 14/17: [FLINK-17533] Add support for multiple concurrent checkpoints

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 12626a5b381dfebb40515d03992bdf243f7e603d
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue May 5 22:32:10 2020 +0200

    [FLINK-17533] Add support for multiple concurrent checkpoints
---
 .../statefun/flink/core/feedback/Checkpoints.java  |  61 +++++++++
 .../flink/core/feedback/FeedbackUnionOperator.java |  34 ++---
 .../statefun/flink/core/logger/FeedbackLogger.java |  33 +++++
 .../flink/core/logger/UnboundedFeedbackLogger.java |   5 +-
 .../flink/core/feedback/CheckpointsTest.java       | 143 +++++++++++++++++++++
 5 files changed, 260 insertions(+), 16 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
new file mode 100644
index 0000000..8fd0322
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/Checkpoints.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.feedback;
+
+import java.io.OutputStream;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.IOUtils;
+
+final class Checkpoints<T> implements AutoCloseable {
+  private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory;
+  private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap<>();
+
+  Checkpoints(Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory) {
+    this.feedbackLoggerFactory = Objects.requireNonNull(feedbackLoggerFactory);
+  }
+
+  public void startLogging(long checkpointId, OutputStream outputStream) {
+    FeedbackLogger<T> logger = feedbackLoggerFactory.get();
+    logger.startLogging(outputStream);
+    uncompletedCheckpoints.put(checkpointId, logger);
+  }
+
+  public void append(T element) {
+    for (FeedbackLogger<T> logger : uncompletedCheckpoints.values()) {
+      logger.append(element);
+    }
+  }
+
+  public void commitCheckpointsUntil(long checkpointId) {
+    SortedMap<Long, FeedbackLogger<T>> completedCheckpoints =
+        uncompletedCheckpoints.headMap(checkpointId, true);
+    completedCheckpoints.values().forEach(FeedbackLogger::commit);
+    completedCheckpoints.clear();
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeAllQuietly(uncompletedCheckpoints.values());
+    uncompletedCheckpoints.clear();
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
index 402ded8..78fe10d 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/feedback/FeedbackUnionOperator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.statefun.flink.core.feedback;
 
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.concurrent.Executor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -26,9 +27,9 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
 import org.apache.flink.statefun.flink.core.common.SerializableFunction;
-import org.apache.flink.statefun.flink.core.common.SerializablePredicate;
 import org.apache.flink.statefun.flink.core.logger.Loggers;
 import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
+import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLoggerFactory;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.MailboxExecutor;
@@ -43,20 +44,20 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
 
   // -- configuration
   private final FeedbackKey<T> feedbackKey;
-  private final SerializablePredicate<T> isBarrierMessage;
+  private final SerializableFunction<T, OptionalLong> isBarrierMessage;
   private final SerializableFunction<T, ?> keySelector;
   private final long totalMemoryUsedForFeedbackCheckpointing;
   private final TypeSerializer<T> elementSerializer;
 
   // -- runtime
-  private transient UnboundedFeedbackLogger<T> feedbackLogger;
+  private transient Checkpoints<T> checkpoints;
   private transient boolean closedOrDisposed;
   private transient MailboxExecutor mailboxExecutor;
   private transient StreamRecord<T> reusable;
 
   FeedbackUnionOperator(
       FeedbackKey<T> feedbackKey,
-      SerializablePredicate<T> isBarrierMessage,
+      SerializableFunction<T, OptionalLong> isBarrierMessage,
       SerializableFunction<T, ?> keySelector,
       long totalMemoryUsedForFeedbackCheckpointing,
       TypeSerializer<T> elementSerializer,
@@ -84,11 +85,12 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
     if (closedOrDisposed) {
       return;
     }
-    if (isBarrierMessage.test(element)) {
-      feedbackLogger.commit();
+    OptionalLong maybeCheckpoint = isBarrierMessage.apply(element);
+    if (maybeCheckpoint.isPresent()) {
+      checkpoints.commitCheckpointsUntil(maybeCheckpoint.getAsLong());
     } else {
       sendDownstream(element);
-      feedbackLogger.append(element);
+      checkpoints.append(element);
     }
   }
 
@@ -105,22 +107,24 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
     // Initialize the unbounded feedback logger
     //
     @SuppressWarnings("unchecked")
-    UnboundedFeedbackLogger<T> feedbackLogger =
-        (UnboundedFeedbackLogger<T>)
-            Loggers.unboundedSpillableLogger(
+    UnboundedFeedbackLoggerFactory<T> feedbackLoggerFactory =
+        (UnboundedFeedbackLoggerFactory<T>)
+            Loggers.unboundedSpillableLoggerFactory(
                 ioManager,
                 maxParallelism,
                 totalMemoryUsedForFeedbackCheckpointing,
                 elementSerializer,
                 keySelector);
 
-    this.feedbackLogger = feedbackLogger;
+    this.checkpoints = new Checkpoints<>(feedbackLoggerFactory::create);
+
     //
     // we first must reply previously check-pointed envelopes before we start
     // processing any new envelopes.
     //
+    UnboundedFeedbackLogger<T> logger = feedbackLoggerFactory.create();
     for (KeyGroupStatePartitionStreamProvider keyedStateInput : context.getRawKeyedStateInputs()) {
-      this.feedbackLogger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
+      logger.replyLoggedEnvelops(keyedStateInput.getStream(), this);
     }
     //
     // now we can start processing new messages. We do so by registering ourselves as a
@@ -132,7 +136,7 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   @Override
   public void snapshotState(StateSnapshotContext context) throws Exception {
     super.snapshotState(context);
-    this.feedbackLogger.startLogging(context.getRawKeyedOperatorStateOutput());
+    checkpoints.startLogging(context.getCheckpointId(), context.getRawKeyedOperatorStateOutput());
   }
 
   @Override
@@ -152,8 +156,8 @@ public final class FeedbackUnionOperator<T> extends AbstractStreamOperator<T>
   // ------------------------------------------------------------------------------------------------------------------
 
   private void closeInternally() {
-    IOUtils.closeQuietly(feedbackLogger);
-    feedbackLogger = null;
+    IOUtils.closeQuietly(checkpoints);
+    checkpoints = null;
     closedOrDisposed = true;
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
new file mode 100644
index 0000000..465a717
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.logger;
+
+import java.io.OutputStream;
+
+public interface FeedbackLogger<T> extends AutoCloseable {
+
+  /** Start logging messages into the supplied output stream. */
+  void startLogging(OutputStream keyedStateCheckpointOutputStream);
+
+  /** Append a message to the currently logging logger. */
+  void append(T message);
+
+  /** Commit the currently logging logger. */
+  void commit();
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
index ef0360a..409f714 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
 import org.apache.flink.util.IOUtils;
 
-public final class UnboundedFeedbackLogger<T> implements Closeable {
+public final class UnboundedFeedbackLogger<T> implements FeedbackLogger<T> {
   private final Supplier<KeyGroupStream<T>> supplier;
   private final ToIntFunction<T> keyGroupAssigner;
   private final Map<Integer, KeyGroupStream<T>> keyGroupStreams;
@@ -60,6 +60,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
     this.checkpointedStreamOperations = Objects.requireNonNull(ops);
   }
 
+  @Override
   public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
     this.checkpointedStreamOperations.requireKeyedStateCheckpointed(
         keyedStateCheckpointOutputStream);
@@ -68,6 +69,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
         checkpointedStreamOperations.acquireLease(keyedStateCheckpointOutputStream);
   }
 
+  @Override
   public void append(T message) {
     if (keyedStateOutputStream == null) {
       //
@@ -79,6 +81,7 @@ public final class UnboundedFeedbackLogger<T> implements Closeable {
     keyGroup.append(message);
   }
 
+  @Override
   public void commit() {
     try {
       flushToKeyedStateOutputStream();
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
new file mode 100644
index 0000000..7b8dbee
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.feedback;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+public class CheckpointsTest {
+
+  @Test
+  public void usageExample() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.append("hello");
+    checkpoints.append("world");
+    checkpoints.commitCheckpointsUntil(1);
+
+    assertThat(loggers.items(0), contains("hello", "world"));
+    assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+  }
+
+  @Test
+  public void dataIsAppendedToMultipleLoggers() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.append("a");
+
+    checkpoints.startLogging(2, new ByteArrayOutputStream());
+    checkpoints.append("b");
+
+    checkpoints.commitCheckpointsUntil(1);
+    checkpoints.append("c");
+
+    checkpoints.commitCheckpointsUntil(2);
+
+    assertThat(loggers.items(0), contains("a", "b"));
+    assertThat(loggers.items(1), contains("b", "c"));
+  }
+
+  @Test
+  public void committingALaterCheckpointCommitsPreviousCheckpoints() {
+    Loggers loggers = new Loggers();
+
+    Checkpoints<String> checkpoints = new Checkpoints<>(loggers);
+
+    checkpoints.startLogging(1, new ByteArrayOutputStream());
+    checkpoints.startLogging(2, new ByteArrayOutputStream());
+    checkpoints.commitCheckpointsUntil(2);
+
+    assertThat(loggers.state(0), is(LoggerState.COMMITTED));
+    assertThat(loggers.state(1), is(LoggerState.COMMITTED));
+  }
+
+  private enum LoggerState {
+    IDLE,
+    LOGGING,
+    COMMITTED,
+    CLOSED
+  }
+
+  private static final class Loggers implements Supplier<FeedbackLogger<String>> {
+    private final List<FakeLogger> loggers = new ArrayList<>();
+
+    @Override
+    public FeedbackLogger<String> get() {
+      FakeLogger logger = new FakeLogger();
+      loggers.add(logger);
+      return logger;
+    }
+
+    List<String> items(int loggerIndex) {
+      Preconditions.checkElementIndex(loggerIndex, loggers.size());
+      FakeLogger logger = loggers.get(loggerIndex);
+      return logger.items;
+    }
+
+    LoggerState state(int loggerIndex) {
+      Preconditions.checkElementIndex(loggerIndex, loggers.size());
+      FakeLogger logger = loggers.get(loggerIndex);
+      return logger.state;
+    }
+  }
+
+  private static final class FakeLogger implements FeedbackLogger<String> {
+
+    List<String> items = new ArrayList<>();
+    LoggerState state = LoggerState.IDLE;
+
+    @Override
+    public void startLogging(OutputStream keyedStateCheckpointOutputStream) {
+      Preconditions.checkState(state == LoggerState.IDLE);
+      state = LoggerState.LOGGING;
+    }
+
+    @Override
+    public void append(String message) {
+      Preconditions.checkState(state != LoggerState.COMMITTED);
+      Preconditions.checkState(state != LoggerState.CLOSED);
+      items.add(message);
+    }
+
+    @Override
+    public void commit() {
+      Preconditions.checkState(state == LoggerState.LOGGING);
+      state = LoggerState.COMMITTED;
+    }
+
+    @Override
+    public void close() {
+      state = LoggerState.CLOSED;
+    }
+  }
+}