You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/09 17:24:32 UTC

[flink] 02/04: [refactor][state/changelog] Complete upload tasks in StateChangeUploadScheduler

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f1480af49fb6d70895ecb51a4b74a0cf4ee9afd
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Mar 3 13:37:17 2022 +0100

    [refactor][state/changelog] Complete upload tasks in StateChangeUploadScheduler
    
    ... instead of StateChangeUploader.
    That would allow to discard unnecessarily uploaded state in subsequent commit.
---
 .../fs/BatchingStateChangeUploadScheduler.java     |  2 +-
 .../flink/changelog/fs/StateChangeFsUploader.java  | 38 ++++---------------
 .../changelog/fs/StateChangeUploadScheduler.java   | 14 ++++---
 .../flink/changelog/fs/StateChangeUploader.java    | 43 +++++++++++++++++++++-
 .../fs/BatchingStateChangeUploadSchedulerTest.java |  9 ++++-
 .../changelog/fs/ChangelogStorageMetricsTest.java  | 20 +++++-----
 .../changelog/fs/TestingStateChangeUploader.java   |  4 +-
 7 files changed, 79 insertions(+), 51 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
index 0951117..de00c55 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
@@ -228,7 +228,7 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
             uploadBatchSizes.update(tasks.size());
             retryingExecutor.execute(
                     retryPolicy,
-                    () -> delegate.upload(tasks),
+                    () -> delegate.upload(tasks).complete(),
                     t -> tasks.forEach(task -> task.fail(t)));
         } catch (Throwable t) {
             tasks.forEach(task -> task.fail(t));
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index a4dbacb..f2755d4 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.clock.Clock;
@@ -39,11 +38,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static java.util.stream.Collectors.toList;
 import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
 
 /**
@@ -76,15 +73,13 @@ class StateChangeFsUploader implements StateChangeUploader {
         this.clock = SystemClock.getInstance();
     }
 
-    public void upload(Collection<UploadTask> tasks) throws IOException {
+    public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
         final String fileName = generateFileName();
         LOG.debug("upload {} tasks to {}", tasks.size(), fileName);
         Path path = new Path(basePath, fileName);
 
         try {
-            LocalResult result = uploadWithMetrics(path, tasks);
-            result.tasksOffsets.forEach(
-                    (task, offsets) -> task.complete(buildResults(result.handle, offsets)));
+            return uploadWithMetrics(path, tasks);
         } catch (IOException e) {
             metrics.getUploadFailuresCounter().inc();
             try (Closer closer = Closer.create()) {
@@ -96,19 +91,20 @@ class StateChangeFsUploader implements StateChangeUploader {
                 closer.register(() -> fileSystem.delete(path, true));
             }
         }
+        return null; // closer above throws an exception
     }
 
-    private LocalResult uploadWithMetrics(Path path, Collection<UploadTask> tasks)
+    private UploadTasksResult uploadWithMetrics(Path path, Collection<UploadTask> tasks)
             throws IOException {
         metrics.getUploadsCounter().inc();
         long start = clock.relativeTimeNanos();
-        LocalResult result = upload(path, tasks);
+        UploadTasksResult result = upload(path, tasks);
         metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start);
-        metrics.getUploadSizes().update(result.handle.getStateSize());
+        metrics.getUploadSizes().update(result.getStateSize());
         return result;
     }
 
-    private LocalResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
+    private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
         boolean wrappedStreamClosed = false;
         FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
         try {
@@ -121,7 +117,7 @@ class StateChangeFsUploader implements StateChangeUploader {
                 FileStateHandle handle = new FileStateHandle(path, stream.getPos());
                 // WARN: streams have to be closed before returning the results
                 // otherwise JM may receive invalid handles
-                return new LocalResult(tasksOffsets, handle);
+                return new UploadTasksResult(tasksOffsets, handle);
             } finally {
                 wrappedStreamClosed = true;
             }
@@ -132,17 +128,6 @@ class StateChangeFsUploader implements StateChangeUploader {
         }
     }
 
-    private static final class LocalResult {
-        private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets;
-        private final StreamStateHandle handle;
-
-        public LocalResult(
-                Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) {
-            this.tasksOffsets = tasksOffsets;
-            this.handle = handle;
-        }
-    }
-
     private OutputStreamWithPos wrap(FSDataOutputStream fsStream) throws IOException {
         StreamCompressionDecorator instance =
                 compression
@@ -153,13 +138,6 @@ class StateChangeFsUploader implements StateChangeUploader {
         return new OutputStreamWithPos(new BufferedOutputStream(compressed, bufferSize));
     }
 
-    private List<UploadResult> buildResults(
-            StreamStateHandle handle, Map<StateChangeSet, Long> offsets) {
-        return offsets.entrySet().stream()
-                .map(e -> UploadResult.of(handle, e.getKey(), e.getValue()))
-                .collect(toList());
-    }
-
     private String generateFileName() {
         return UUID.randomUUID().toString();
     }
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 3396053..7a28165 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -56,18 +56,22 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Schedules {@link UploadTask upload tasks} on a {@link StateChangeUploader}. In the simplest form,
- * directly calls {@link StateChangeUploader#upload(Collection)} (UploadTask)}. Other
- * implementations might batch the tasks for efficiency.
+ * directly calls {@link StateChangeUploader#upload(Collection)}. Other implementations might batch
+ * the tasks for efficiency.
  */
 interface StateChangeUploadScheduler extends AutoCloseable {
 
+    /**
+     * Schedule the upload and {@link UploadTask#complete(List) complete} or {@link
+     * UploadTask#fail(Throwable) fail} the corresponding tasks.
+     */
     void upload(UploadTask uploadTask) throws IOException;
 
     static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader) {
         return new StateChangeUploadScheduler() {
             @Override
             public void upload(UploadTask uploadTask) throws IOException {
-                uploader.upload(singletonList(uploadTask));
+                uploader.upload(singletonList(uploadTask)).complete();
             }
 
             @Override
@@ -109,8 +113,8 @@ interface StateChangeUploadScheduler extends AutoCloseable {
     @ThreadSafe
     final class UploadTask {
         final Collection<StateChangeSet> changeSets;
-        final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
         final Consumer<List<UploadResult>> successCallback;
+        final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
         final AtomicBoolean finished = new AtomicBoolean();
 
         public UploadTask(
@@ -118,8 +122,8 @@ interface StateChangeUploadScheduler extends AutoCloseable {
                 Consumer<List<UploadResult>> successCallback,
                 BiConsumer<List<SequenceNumber>, Throwable> failureCallback) {
             this.changeSets = new ArrayList<>(changeSets);
-            this.failureCallback = failureCallback;
             this.successCallback = successCallback;
+            this.failureCallback = failureCallback;
         }
 
         public void complete(List<UploadResult> results) {
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
index 6e5df9f..aebe5d2 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
@@ -18,9 +18,16 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.stream.Collectors.toList;
 
 /**
  * The purpose of this interface is to abstract the different implementations of uploading state
@@ -28,5 +35,39 @@ import java.util.Collection;
  * argument which is meant to initiate such an upload.
  */
 interface StateChangeUploader extends AutoCloseable {
-    void upload(Collection<UploadTask> tasks) throws IOException;
+    /**
+     * Execute the upload task and return the results. It is the caller responsibility to {@link
+     * UploadTask#complete(List) complete} the tasks.
+     */
+    UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
+
+    final class UploadTasksResult {
+        private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets;
+        private final StreamStateHandle handle;
+
+        public UploadTasksResult(
+                Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) {
+            this.tasksOffsets = unmodifiableMap(tasksOffsets);
+            this.handle = Preconditions.checkNotNull(handle);
+        }
+
+        public void complete() {
+            for (Map.Entry<UploadTask, Map<StateChangeSet, Long>> entry : tasksOffsets.entrySet()) {
+                UploadTask task = entry.getKey();
+                Map<StateChangeSet, Long> offsets = entry.getValue();
+                task.complete(buildResults(handle, offsets));
+            }
+        }
+
+        private List<UploadResult> buildResults(
+                StreamStateHandle handle, Map<StateChangeSet, Long> offsets) {
+            return offsets.entrySet().stream()
+                    .map(e -> UploadResult.of(handle, e.getKey(), e.getValue()))
+                    .collect(toList());
+        }
+
+        public long getStateSize() {
+            return handle.getStateSize();
+        }
+    }
 }
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 7d071f0..d25642c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.function.BiConsumerWithException;
 
@@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
@@ -148,7 +150,8 @@ public class BatchingStateChangeUploadSchedulerTest {
                             final AtomicInteger currentAttempt = new AtomicInteger(0);
 
                             @Override
-                            public void upload(Collection<UploadTask> tasks) throws IOException {
+                            public UploadTasksResult upload(Collection<UploadTask> tasks)
+                                    throws IOException {
                                 for (UploadTask uploadTask : tasks) {
                                     if (currentAttempt.getAndIncrement() < maxAttempts - 1) {
                                         throw new IOException();
@@ -156,6 +159,7 @@ public class BatchingStateChangeUploadSchedulerTest {
                                         uploadTask.complete(emptyList());
                                     }
                                 }
+                                return null;
                             }
                         },
                         new DirectScheduledExecutorService(),
@@ -390,9 +394,10 @@ public class BatchingStateChangeUploadSchedulerTest {
 
     private static final class BlockingUploader implements StateChangeUploader {
         @Override
-        public void upload(Collection<UploadTask> tasks) {
+        public UploadTasksResult upload(Collection<UploadTask> tasks) {
             try {
                 Thread.sleep(Long.MAX_VALUE);
+                return new UploadTasksResult(emptyMap(), new EmptyStreamStateHandle());
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index d919451..a4e9e55 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -35,10 +35,11 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.apache.flink.changelog.fs.ChangelogStorageMetricGroup.CHANGELOG_STORAGE_UPLOAD_QUEUE_SIZE;
@@ -264,25 +265,22 @@ public class ChangelogStorageMetricsTest {
         }
 
         @Override
-        public void upload(Collection<UploadTask> tasks) throws IOException {
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
             for (UploadTask uploadTask : tasks) {
                 int currentAttempt = 1 + attemptsPerTask.getOrDefault(uploadTask, 0);
                 if (currentAttempt == maxAttempts) {
                     attemptsPerTask.remove(uploadTask);
-                    uploadTask.complete(Collections.singletonList(getResult(uploadTask)));
+                    map.put(
+                            uploadTask,
+                            uploadTask.changeSets.stream()
+                                    .collect(Collectors.toMap(Function.identity(), ign -> 0L)));
                 } else {
                     attemptsPerTask.put(uploadTask, currentAttempt);
                     throw new IOException();
                 }
             }
-        }
-
-        private UploadResult getResult(UploadTask uploadTask) {
-            return new UploadResult(
-                    new EmptyStreamStateHandle(),
-                    0,
-                    uploadTask.changeSets.iterator().next().getSequenceNumber(),
-                    uploadTask.getSize());
+            return new UploadTasksResult(map, new EmptyStreamStateHandle());
         }
 
         @Override
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
index 5b13367..8e29dd1 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
 
 class TestingStateChangeUploader implements StateChangeUploader {
@@ -44,11 +45,12 @@ class TestingStateChangeUploader implements StateChangeUploader {
     }
 
     @Override
-    public void upload(Collection<UploadTask> tasks) throws IOException {
+    public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
         for (UploadTask uploadTask : tasks) {
             this.uploaded.addAll(uploadTask.changeSets);
             this.tasks.add(uploadTask);
         }
+        return new UploadTasksResult(emptyMap(), new ByteStreamStateHandle("", new byte[0]));
     }
 
     public Collection<StateChangeSet> getUploaded() {