You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/07/29 07:04:53 UTC

[flink] branch release-1.15 updated: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new d5dc354dce6 [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
d5dc354dce6 is described below

commit d5dc354dce663230b38973b285ff31b943da8fff
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jul 19 15:03:26 2022 +0800

    [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
---
 .../flink/changelog/fs/StateChangeFsUploader.java  | 16 +---
 .../changelog/fs/StateChangeFsUploaderTest.java    | 95 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 13 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index f2755d469a4..a81bd45f503 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -105,9 +104,7 @@ class StateChangeFsUploader implements StateChangeUploader {
     }
 
     private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
-        boolean wrappedStreamClosed = false;
-        FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
-        try {
+        try (FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE)) {
             fsStream.write(compression ? 1 : 0);
             try (OutputStreamWithPos stream = wrap(fsStream)) {
                 final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets = new HashMap<>();
@@ -118,12 +115,6 @@ class StateChangeFsUploader implements StateChangeUploader {
                 // WARN: streams have to be closed before returning the results
                 // otherwise JM may receive invalid handles
                 return new UploadTasksResult(tasksOffsets, handle);
-            } finally {
-                wrappedStreamClosed = true;
-            }
-        } finally {
-            if (!wrappedStreamClosed) {
-                fsStream.close();
             }
         }
     }
@@ -133,9 +124,8 @@ class StateChangeFsUploader implements StateChangeUploader {
                 compression
                         ? SnappyStreamCompressionDecorator.INSTANCE
                         : UncompressedStreamCompressionDecorator.INSTANCE;
-        OutputStream compressed =
-                compression ? instance.decorateWithCompression(fsStream) : fsStream;
-        return new OutputStreamWithPos(new BufferedOutputStream(compressed, bufferSize));
+        return new OutputStreamWithPos(
+                new BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize));
     }
 
     private String generateFileName() {
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
new file mode 100644
index 00000000000..80c1587c171
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@link StateChangeFsUploader} test. */
+class StateChangeFsUploaderTest {
+
+    @TempDir java.nio.file.Path tempFolder;
+
+    @Test
+    void testCompression() throws IOException {
+        AtomicBoolean outputStreamClosed = new AtomicBoolean(false);
+
+        FileSystem wrappedFileSystem =
+                new LocalFileSystem() {
+                    @Override
+                    public FSDataOutputStream create(Path filePath, WriteMode overwrite)
+                            throws IOException {
+                        checkNotNull(filePath, "filePath");
+
+                        if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
+                            throw new FileAlreadyExistsException(
+                                    "File already exists: " + filePath);
+                        }
+
+                        final Path parent = filePath.getParent();
+                        if (parent != null && !mkdirs(parent)) {
+                            throw new IOException("Mkdirs failed to create " + parent);
+                        }
+
+                        final File file = pathToFile(filePath);
+                        return new LocalDataOutputStream(file) {
+                            @Override
+                            public void close() throws IOException {
+                                super.close();
+                                outputStreamClosed.set(true);
+                            }
+                        };
+                    }
+                };
+
+        StateChangeFsUploader uploader = createUploader(wrappedFileSystem, false);
+        uploader.upload(Collections.emptyList());
+        assertThat(outputStreamClosed.get()).isTrue();
+
+        outputStreamClosed.set(false);
+        uploader = createUploader(wrappedFileSystem, true);
+        uploader.upload(Collections.emptyList());
+        assertThat(outputStreamClosed.get()).isTrue();
+    }
+
+    private StateChangeFsUploader createUploader(FileSystem fileSystem, boolean compression) {
+        return new StateChangeFsUploader(
+                new Path(tempFolder.toUri()),
+                fileSystem,
+                compression,
+                4096,
+                new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup()));
+    }
+}