You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/07/29 07:04:53 UTC
[flink] branch release-1.15 updated: [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new d5dc354dce6 [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
d5dc354dce6 is described below
commit d5dc354dce663230b38973b285ff31b943da8fff
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jul 19 15:03:26 2022 +0800
[FLINK-28602][state/changelog] Close stream of StateChangeFsUploader normally while enabling compression
---
.../flink/changelog/fs/StateChangeFsUploader.java | 16 +---
.../changelog/fs/StateChangeFsUploaderTest.java | 95 ++++++++++++++++++++++
2 files changed, 98 insertions(+), 13 deletions(-)
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index f2755d469a4..a81bd45f503 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -105,9 +104,7 @@ class StateChangeFsUploader implements StateChangeUploader {
}
private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
- boolean wrappedStreamClosed = false;
- FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
- try {
+ try (FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE)) {
fsStream.write(compression ? 1 : 0);
try (OutputStreamWithPos stream = wrap(fsStream)) {
final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets = new HashMap<>();
@@ -118,12 +115,6 @@ class StateChangeFsUploader implements StateChangeUploader {
// WARN: streams have to be closed before returning the results
// otherwise JM may receive invalid handles
return new UploadTasksResult(tasksOffsets, handle);
- } finally {
- wrappedStreamClosed = true;
- }
- } finally {
- if (!wrappedStreamClosed) {
- fsStream.close();
}
}
}
@@ -133,9 +124,8 @@ class StateChangeFsUploader implements StateChangeUploader {
compression
? SnappyStreamCompressionDecorator.INSTANCE
: UncompressedStreamCompressionDecorator.INSTANCE;
- OutputStream compressed =
- compression ? instance.decorateWithCompression(fsStream) : fsStream;
- return new OutputStreamWithPos(new BufferedOutputStream(compressed, bufferSize));
+ return new OutputStreamWithPos(
+ new BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize));
}
private String generateFileName() {
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
new file mode 100644
index 00000000000..80c1587c171
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@link StateChangeFsUploader} test. */
+class StateChangeFsUploaderTest {
+
+ @TempDir java.nio.file.Path tempFolder;
+
+ @Test
+ void testCompression() throws IOException {
+ AtomicBoolean outputStreamClosed = new AtomicBoolean(false);
+
+ FileSystem wrappedFileSystem =
+ new LocalFileSystem() {
+ @Override
+ public FSDataOutputStream create(Path filePath, WriteMode overwrite)
+ throws IOException {
+ checkNotNull(filePath, "filePath");
+
+ if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
+ throw new FileAlreadyExistsException(
+ "File already exists: " + filePath);
+ }
+
+ final Path parent = filePath.getParent();
+ if (parent != null && !mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " + parent);
+ }
+
+ final File file = pathToFile(filePath);
+ return new LocalDataOutputStream(file) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ outputStreamClosed.set(true);
+ }
+ };
+ }
+ };
+
+ StateChangeFsUploader uploader = createUploader(wrappedFileSystem, false);
+ uploader.upload(Collections.emptyList());
+ assertThat(outputStreamClosed.get()).isTrue();
+
+ outputStreamClosed.set(false);
+ uploader = createUploader(wrappedFileSystem, true);
+ uploader.upload(Collections.emptyList());
+ assertThat(outputStreamClosed.get()).isTrue();
+ }
+
+ private StateChangeFsUploader createUploader(FileSystem fileSystem, boolean compression) {
+ return new StateChangeFsUploader(
+ new Path(tempFolder.toUri()),
+ fileSystem,
+ compression,
+ 4096,
+ new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup()));
+ }
+}