You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/09 17:24:32 UTC
[flink] 02/04: [refactor][state/changelog] Complete upload tasks in StateChangeUploadScheduler
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f1480af49fb6d70895ecb51a4b74a0cf4ee9afd
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Thu Mar 3 13:37:17 2022 +0100
[refactor][state/changelog] Complete upload tasks in StateChangeUploadScheduler
... instead of StateChangeUploader.
That would allow to discard unnecessarily uploaded state in subsequent commit.
---
.../fs/BatchingStateChangeUploadScheduler.java | 2 +-
.../flink/changelog/fs/StateChangeFsUploader.java | 38 ++++---------------
.../changelog/fs/StateChangeUploadScheduler.java | 14 ++++---
.../flink/changelog/fs/StateChangeUploader.java | 43 +++++++++++++++++++++-
.../fs/BatchingStateChangeUploadSchedulerTest.java | 9 ++++-
.../changelog/fs/ChangelogStorageMetricsTest.java | 20 +++++-----
.../changelog/fs/TestingStateChangeUploader.java | 4 +-
7 files changed, 79 insertions(+), 51 deletions(-)
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
index 0951117..de00c55 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
@@ -228,7 +228,7 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {
uploadBatchSizes.update(tasks.size());
retryingExecutor.execute(
retryPolicy,
- () -> delegate.upload(tasks),
+ () -> delegate.upload(tasks).complete(),
t -> tasks.forEach(task -> task.fail(t)));
} catch (Throwable t) {
tasks.forEach(task -> task.fail(t));
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index a4dbacb..f2755d4 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.util.clock.Clock;
@@ -39,11 +38,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import static java.util.stream.Collectors.toList;
import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
/**
@@ -76,15 +73,13 @@ class StateChangeFsUploader implements StateChangeUploader {
this.clock = SystemClock.getInstance();
}
- public void upload(Collection<UploadTask> tasks) throws IOException {
+ public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
final String fileName = generateFileName();
LOG.debug("upload {} tasks to {}", tasks.size(), fileName);
Path path = new Path(basePath, fileName);
try {
- LocalResult result = uploadWithMetrics(path, tasks);
- result.tasksOffsets.forEach(
- (task, offsets) -> task.complete(buildResults(result.handle, offsets)));
+ return uploadWithMetrics(path, tasks);
} catch (IOException e) {
metrics.getUploadFailuresCounter().inc();
try (Closer closer = Closer.create()) {
@@ -96,19 +91,20 @@ class StateChangeFsUploader implements StateChangeUploader {
closer.register(() -> fileSystem.delete(path, true));
}
}
+ return null; // closer above throws an exception
}
- private LocalResult uploadWithMetrics(Path path, Collection<UploadTask> tasks)
+ private UploadTasksResult uploadWithMetrics(Path path, Collection<UploadTask> tasks)
throws IOException {
metrics.getUploadsCounter().inc();
long start = clock.relativeTimeNanos();
- LocalResult result = upload(path, tasks);
+ UploadTasksResult result = upload(path, tasks);
metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start);
- metrics.getUploadSizes().update(result.handle.getStateSize());
+ metrics.getUploadSizes().update(result.getStateSize());
return result;
}
- private LocalResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
+ private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
boolean wrappedStreamClosed = false;
FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
try {
@@ -121,7 +117,7 @@ class StateChangeFsUploader implements StateChangeUploader {
FileStateHandle handle = new FileStateHandle(path, stream.getPos());
// WARN: streams have to be closed before returning the results
// otherwise JM may receive invalid handles
- return new LocalResult(tasksOffsets, handle);
+ return new UploadTasksResult(tasksOffsets, handle);
} finally {
wrappedStreamClosed = true;
}
@@ -132,17 +128,6 @@ class StateChangeFsUploader implements StateChangeUploader {
}
}
- private static final class LocalResult {
- private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets;
- private final StreamStateHandle handle;
-
- public LocalResult(
- Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) {
- this.tasksOffsets = tasksOffsets;
- this.handle = handle;
- }
- }
-
private OutputStreamWithPos wrap(FSDataOutputStream fsStream) throws IOException {
StreamCompressionDecorator instance =
compression
@@ -153,13 +138,6 @@ class StateChangeFsUploader implements StateChangeUploader {
return new OutputStreamWithPos(new BufferedOutputStream(compressed, bufferSize));
}
- private List<UploadResult> buildResults(
- StreamStateHandle handle, Map<StateChangeSet, Long> offsets) {
- return offsets.entrySet().stream()
- .map(e -> UploadResult.of(handle, e.getKey(), e.getValue()))
- .collect(toList());
- }
-
private String generateFileName() {
return UUID.randomUUID().toString();
}
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 3396053..7a28165 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -56,18 +56,22 @@ import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Schedules {@link UploadTask upload tasks} on a {@link StateChangeUploader}. In the simplest form,
- * directly calls {@link StateChangeUploader#upload(Collection)} (UploadTask)}. Other
- * implementations might batch the tasks for efficiency.
+ * directly calls {@link StateChangeUploader#upload(Collection)}. Other implementations might batch
+ * the tasks for efficiency.
*/
interface StateChangeUploadScheduler extends AutoCloseable {
+ /**
+ * Schedule the upload and {@link UploadTask#complete(List) complete} or {@link
+ * UploadTask#fail(Throwable) fail} the corresponding tasks.
+ */
void upload(UploadTask uploadTask) throws IOException;
static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader) {
return new StateChangeUploadScheduler() {
@Override
public void upload(UploadTask uploadTask) throws IOException {
- uploader.upload(singletonList(uploadTask));
+ uploader.upload(singletonList(uploadTask)).complete();
}
@Override
@@ -109,8 +113,8 @@ interface StateChangeUploadScheduler extends AutoCloseable {
@ThreadSafe
final class UploadTask {
final Collection<StateChangeSet> changeSets;
- final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
final Consumer<List<UploadResult>> successCallback;
+ final BiConsumer<List<SequenceNumber>, Throwable> failureCallback;
final AtomicBoolean finished = new AtomicBoolean();
public UploadTask(
@@ -118,8 +122,8 @@ interface StateChangeUploadScheduler extends AutoCloseable {
Consumer<List<UploadResult>> successCallback,
BiConsumer<List<SequenceNumber>, Throwable> failureCallback) {
this.changeSets = new ArrayList<>(changeSets);
- this.failureCallback = failureCallback;
this.successCallback = successCallback;
+ this.failureCallback = failureCallback;
}
public void complete(List<UploadResult> results) {
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
index 6e5df9f..aebe5d2 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
@@ -18,9 +18,16 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.stream.Collectors.toList;
/**
* The purpose of this interface is to abstract the different implementations of uploading state
@@ -28,5 +35,39 @@ import java.util.Collection;
* argument which is meant to initiate such an upload.
*/
interface StateChangeUploader extends AutoCloseable {
- void upload(Collection<UploadTask> tasks) throws IOException;
+ /**
+ * Execute the upload task and return the results. It is the caller responsibility to {@link
+ * UploadTask#complete(List) complete} the tasks.
+ */
+ UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException;
+
+ final class UploadTasksResult {
+ private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets;
+ private final StreamStateHandle handle;
+
+ public UploadTasksResult(
+ Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) {
+ this.tasksOffsets = unmodifiableMap(tasksOffsets);
+ this.handle = Preconditions.checkNotNull(handle);
+ }
+
+ public void complete() {
+ for (Map.Entry<UploadTask, Map<StateChangeSet, Long>> entry : tasksOffsets.entrySet()) {
+ UploadTask task = entry.getKey();
+ Map<StateChangeSet, Long> offsets = entry.getValue();
+ task.complete(buildResults(handle, offsets));
+ }
+ }
+
+ private List<UploadResult> buildResults(
+ StreamStateHandle handle, Map<StateChangeSet, Long> offsets) {
+ return offsets.entrySet().stream()
+ .map(e -> UploadResult.of(handle, e.getKey(), e.getValue()))
+ .collect(toList());
+ }
+
+ public long getStateSize() {
+ return handle.getStateSize();
+ }
+ }
}
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 7d071f0..d25642c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.function.BiConsumerWithException;
@@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
@@ -148,7 +150,8 @@ public class BatchingStateChangeUploadSchedulerTest {
final AtomicInteger currentAttempt = new AtomicInteger(0);
@Override
- public void upload(Collection<UploadTask> tasks) throws IOException {
+ public UploadTasksResult upload(Collection<UploadTask> tasks)
+ throws IOException {
for (UploadTask uploadTask : tasks) {
if (currentAttempt.getAndIncrement() < maxAttempts - 1) {
throw new IOException();
@@ -156,6 +159,7 @@ public class BatchingStateChangeUploadSchedulerTest {
uploadTask.complete(emptyList());
}
}
+ return null;
}
},
new DirectScheduledExecutorService(),
@@ -390,9 +394,10 @@ public class BatchingStateChangeUploadSchedulerTest {
private static final class BlockingUploader implements StateChangeUploader {
@Override
- public void upload(Collection<UploadTask> tasks) {
+ public UploadTasksResult upload(Collection<UploadTask> tasks) {
try {
Thread.sleep(Long.MAX_VALUE);
+ return new UploadTasksResult(emptyMap(), new EmptyStreamStateHandle());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index d919451..a4e9e55 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -35,10 +35,11 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.apache.flink.changelog.fs.ChangelogStorageMetricGroup.CHANGELOG_STORAGE_UPLOAD_QUEUE_SIZE;
@@ -264,25 +265,22 @@ public class ChangelogStorageMetricsTest {
}
@Override
- public void upload(Collection<UploadTask> tasks) throws IOException {
+ public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+ Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
for (UploadTask uploadTask : tasks) {
int currentAttempt = 1 + attemptsPerTask.getOrDefault(uploadTask, 0);
if (currentAttempt == maxAttempts) {
attemptsPerTask.remove(uploadTask);
- uploadTask.complete(Collections.singletonList(getResult(uploadTask)));
+ map.put(
+ uploadTask,
+ uploadTask.changeSets.stream()
+ .collect(Collectors.toMap(Function.identity(), ign -> 0L)));
} else {
attemptsPerTask.put(uploadTask, currentAttempt);
throw new IOException();
}
}
- }
-
- private UploadResult getResult(UploadTask uploadTask) {
- return new UploadResult(
- new EmptyStreamStateHandle(),
- 0,
- uploadTask.changeSets.iterator().next().getSequenceNumber(),
- uploadTask.getSize());
+ return new UploadTasksResult(map, new EmptyStreamStateHandle());
}
@Override
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
index 5b13367..8e29dd1 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
class TestingStateChangeUploader implements StateChangeUploader {
@@ -44,11 +45,12 @@ class TestingStateChangeUploader implements StateChangeUploader {
}
@Override
- public void upload(Collection<UploadTask> tasks) throws IOException {
+ public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
for (UploadTask uploadTask : tasks) {
this.uploaded.addAll(uploadTask.changeSets);
this.tasks.add(uploadTask);
}
+ return new UploadTasksResult(emptyMap(), new ByteStreamStateHandle("", new byte[0]));
}
public Collection<StateChangeSet> getUploaded() {