You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/28 08:23:36 UTC
[flink] branch master updated: [FLINK-11388][oss] Add Aliyun OSS recoverable writer
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 672d26e [FLINK-11388][oss] Add Aliyun OSS recoverable writer
672d26e is described below
commit 672d26e831047a99b4d6432fe0046ff036279887
Author: Jinhu Wu <wu...@126.com>
AuthorDate: Mon Mar 28 16:22:58 2022 +0800
[FLINK-11388][oss] Add Aliyun OSS recoverable writer
This closes #19072
---
.../flink/fs/osshadoop/FlinkOSSFileSystem.java | 103 +++++++++
.../org/apache/flink/fs/osshadoop/OSSAccessor.java | 100 +++++++++
.../flink/fs/osshadoop/OSSFileSystemFactory.java | 32 ++-
.../flink/fs/osshadoop/writer/OSSCommitter.java | 93 ++++++++
.../flink/fs/osshadoop/writer/OSSRecoverable.java | 83 +++++++
.../writer/OSSRecoverableFsDataOutputStream.java | 186 +++++++++++++++
.../writer/OSSRecoverableMultipartUpload.java | 237 ++++++++++++++++++++
.../osshadoop/writer/OSSRecoverableSerializer.java | 170 ++++++++++++++
.../fs/osshadoop/writer/OSSRecoverableWriter.java | 152 +++++++++++++
.../HadoopOSSRecoverableWriterExceptionITCase.java | 73 ++++++
.../HadoopOSSRecoverableWriterITCase.java | 81 +++++++
.../apache/flink/fs/osshadoop/OSSTestUtils.java | 129 +++++++++++
.../OSSRecoverableFsDataOutputStreamTest.java | 249 +++++++++++++++++++++
.../writer/OSSRecoverableMultipartUploadTest.java | 205 +++++++++++++++++
.../writer/OSSRecoverableSerializerTest.java | 151 +++++++++++++
15 files changed, 2042 insertions(+), 2 deletions(-)
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/FlinkOSSFileSystem.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/FlinkOSSFileSystem.java
new file mode 100644
index 0000000..7bce99e
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/FlinkOSSFileSystem.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedTmpFileCreator;
+import org.apache.flink.fs.osshadoop.writer.OSSRecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for Aliyun OSS.
+ * This class implements the common behavior implemented directly by Flink and delegates common
+ * calls to an implementation of Hadoop's filesystem abstraction.
+ */
+public class FlinkOSSFileSystem extends HadoopFileSystem {
+
+ // Minimum size of each of or multipart pieces in bytes
+ public static final long MULTIPART_UPLOAD_PART_SIZE_MIN = 10L << 20;
+
+ // Size of each of or multipart pieces in bytes
+ private long ossUploadPartSize;
+
+ private int maxConcurrentUploadsPerStream;
+
+ private final Executor uploadThreadPool;
+
+ private String localTmpDir;
+
+ private final FunctionWithException<File, RefCountedFileWithStream, IOException>
+ cachedFileCreator;
+
+ private OSSAccessor ossAccessor;
+
+ public FlinkOSSFileSystem(
+ org.apache.hadoop.fs.FileSystem fileSystem,
+ long ossUploadPartSize,
+ int maxConcurrentUploadsPerStream,
+ String localTmpDirectory,
+ OSSAccessor ossAccessor) {
+ super(fileSystem);
+
+ Preconditions.checkArgument(ossUploadPartSize >= MULTIPART_UPLOAD_PART_SIZE_MIN);
+
+ this.ossUploadPartSize = ossUploadPartSize;
+ this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+
+ this.uploadThreadPool = Executors.newCachedThreadPool();
+
+ // recoverable writer parameter configuration initialization
+ // Temporary directory for cache data before uploading to OSS
+
+ this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
+
+ this.cachedFileCreator =
+ RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
+
+ this.ossAccessor = ossAccessor;
+ }
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.OBJECT_STORE;
+ }
+
+ @Override
+ public RecoverableWriter createRecoverableWriter() throws IOException {
+ return new OSSRecoverableWriter(
+ ossAccessor,
+ ossUploadPartSize,
+ maxConcurrentUploadsPerStream,
+ uploadThreadPool,
+ cachedFileCreator);
+ }
+
+ public String getLocalTmpDir() {
+ return localTmpDir;
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSAccessor.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSAccessor.java
new file mode 100644
index 0000000..a515f2a
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSAccessor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Core implementation of Aliyun OSS Filesystem for Flink. Provides the bridging logic between
+ * Hadoop's abstract filesystem and Aliyun OSS.
+ */
+public class OSSAccessor {
+
+ private AliyunOSSFileSystem fs;
+ private AliyunOSSFileSystemStore store;
+
+ public OSSAccessor(AliyunOSSFileSystem fs) {
+ this.fs = fs;
+ this.store = fs.getStore();
+ }
+
+ public String pathToObject(final Path path) {
+ org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
+ if (!hadoopPath.isAbsolute()) {
+ hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
+ }
+
+ return hadoopPath.toUri().getPath().substring(1);
+ }
+
+ public Path objectToPath(String object) {
+ return new Path("/" + object);
+ }
+
+ public String startMultipartUpload(String objectName) {
+ return store.getUploadId(objectName);
+ }
+
+ public boolean deleteObject(String objectName) throws IOException {
+ return fs.delete(new org.apache.hadoop.fs.Path('/' + objectName), false);
+ }
+
+ public CompleteMultipartUploadResult completeMultipartUpload(
+ String objectName, String uploadId, List<PartETag> partETags) {
+ return store.completeMultipartUpload(objectName, uploadId, partETags);
+ }
+
+ public PartETag uploadPart(File file, String objectName, String uploadId, int idx)
+ throws IOException {
+ return store.uploadPart(file, objectName, uploadId, idx);
+ }
+
+ public void putObject(String objectName, File file) throws IOException {
+ store.uploadObject(objectName, file);
+ }
+
+ public void getObject(String objectName, String dstPath, long length) throws IOException {
+ long contentLength = store.getObjectMetadata(objectName).getContentLength();
+ if (contentLength != length) {
+ throw new IOException(
+ String.format(
+ "Error recovering writer: "
+ + "Downloading the last data chunk file gives incorrect length."
+ + "File length is %d bytes, RecoveryData indicates %d bytes",
+ contentLength, length));
+ }
+
+ org.apache.hadoop.fs.Path srcPath = new org.apache.hadoop.fs.Path("/" + objectName);
+ org.apache.hadoop.fs.Path localPath = new org.apache.hadoop.fs.Path(dstPath);
+ fs.copyToLocalFile(srcPath, localPath);
+
+ String crcFileName = "." + localPath.getName() + ".crc";
+ (new File(localPath.getParent().toString() + "/" + crcFileName)).delete();
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
index 6a1aca7..768f417 100644
--- a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
@@ -19,10 +19,13 @@
package org.apache.flink.fs.osshadoop;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.slf4j.Logger;
@@ -52,6 +55,23 @@ public class OSSFileSystemFactory implements FileSystemFactory {
*/
private static final String[] FLINK_CONFIG_PREFIXES = {"fs.oss."};
+ public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
+ ConfigOptions.key("oss.upload.min.part.size")
+ .defaultValue(FlinkOSSFileSystem.MULTIPART_UPLOAD_PART_SIZE_MIN)
+ .withDescription(
+ "This option is relevant to the Recoverable Writer and sets the min size of data that "
+ + "buffered locally, before being sent to OSS. Flink also takes care of checkpoint locally "
+ + "buffered data. This value cannot be less than 100KB or greater than 5GB (limits set by Aliyun OSS).");
+
+ public static final ConfigOption<Integer> MAX_CONCURRENT_UPLOADS =
+ ConfigOptions.key("oss.upload.max.concurrent.uploads")
+ .defaultValue(Runtime.getRuntime().availableProcessors())
+ .withDescription(
+ "This option is relevant to the Recoverable Writer and limits the number of "
+ + "parts that can be concurrently in-flight. By default, this is set to "
+ + Runtime.getRuntime().availableProcessors()
+ + ".");
+
@Override
public String getScheme() {
return "oss";
@@ -81,7 +101,15 @@ public class OSSFileSystemFactory implements FileSystemFactory {
final AliyunOSSFileSystem fs = new AliyunOSSFileSystem();
fs.initialize(fsUri, hadoopConfig);
- return new HadoopFileSystem(fs);
+ final String[] localTmpDirectories = ConfigurationUtils.parseTempDirectories(flinkConfig);
+ Preconditions.checkArgument(localTmpDirectories.length > 0);
+ final String localTmpDirectory = localTmpDirectories[0];
+ return new FlinkOSSFileSystem(
+ fs,
+ flinkConfig.getLong(PART_UPLOAD_MIN_SIZE),
+ flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS),
+ localTmpDirectory,
+ new OSSAccessor(fs));
}
@VisibleForTesting
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSCommitter.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSCommitter.java
new file mode 100644
index 0000000..699c465
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSCommitter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+
+import com.aliyun.oss.model.PartETag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data object to commit an OSS MultiPartUpload. */
+public class OSSCommitter implements RecoverableFsDataOutputStream.Committer {
+ private static final Logger LOG = LoggerFactory.getLogger(OSSCommitter.class);
+
+ private OSSAccessor ossAccessor;
+ private String objectName;
+ private String uploadId;
+ private List<PartETag> partETags;
+ private long totalLength;
+
+ public OSSCommitter(
+ OSSAccessor ossAccessor,
+ String objectName,
+ String uploadId,
+ List<PartETag> partETags,
+ long totalLength) {
+ this.ossAccessor = checkNotNull(ossAccessor);
+ this.objectName = checkNotNull(objectName);
+ this.uploadId = checkNotNull(uploadId);
+ this.partETags = checkNotNull(partETags);
+ this.totalLength = totalLength;
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (totalLength > 0L) {
+ LOG.info("Committing {} with multi-part upload ID {}", objectName, uploadId);
+ ossAccessor.completeMultipartUpload(objectName, uploadId, partETags);
+ } else {
+ LOG.debug("No data to commit for file: {}", objectName);
+ }
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ if (totalLength > 0L) {
+ try {
+ LOG.info(
+ "Trying to commit after recovery {} with multi-part upload ID {}",
+ objectName,
+ uploadId);
+ ossAccessor.completeMultipartUpload(objectName, uploadId, partETags);
+ } catch (Exception e) {
+ LOG.info(
+ "Failed to commit after recovery {} with multi-part upload ID {}. "
+ + "exception {}",
+ objectName,
+ uploadId,
+ e);
+ }
+ } else {
+ LOG.debug("No data to commit for file: {}", objectName);
+ }
+ }
+
+ @Override
+ public RecoverableWriter.CommitRecoverable getRecoverable() {
+ return new OSSRecoverable(uploadId, objectName, partETags, null, totalLength, 0);
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverable.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverable.java
new file mode 100644
index 0000000..c177e1a
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import com.aliyun.oss.model.PartETag;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Data object to recover an OSS MultiPartUpload for a recoverable output stream. */
+public class OSSRecoverable implements RecoverableWriter.ResumeRecoverable {
+ private final String uploadId;
+
+ private final String objectName;
+
+ private final List<PartETag> partETags;
+
+ @Nullable private final String lastPartObject;
+
+ private long numBytesInParts;
+
+ private long lastPartObjectLength;
+
+ public OSSRecoverable(
+ String uploadId,
+ String objectName,
+ List<PartETag> partETags,
+ String lastPartObject,
+ long numBytesInParts,
+ long lastPartObjectLength) {
+ this.uploadId = uploadId;
+ this.objectName = objectName;
+ this.partETags = new ArrayList<>(partETags);
+ this.lastPartObject = lastPartObject;
+ this.numBytesInParts = numBytesInParts;
+ this.lastPartObjectLength = lastPartObjectLength;
+ }
+
+ public String getUploadId() {
+ return uploadId;
+ }
+
+ public String getObjectName() {
+ return objectName;
+ }
+
+ public List<PartETag> getPartETags() {
+ return partETags;
+ }
+
+ @Nullable
+ public String getLastPartObject() {
+ return lastPartObject;
+ }
+
+ public long getNumBytesInParts() {
+ return numBytesInParts;
+ }
+
+ public long getLastPartObjectLength() {
+ return lastPartObjectLength;
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..eb8ce7a
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.commons.io.IOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A RecoverableFsDataOutputStream to OSS that is based on a recoverable multipart upload.
+ *
+ * <p>This class is NOT thread-safe. Concurrent writes tho this stream result in corrupt or lost
+ * data.
+ *
+ * <p>The {@link #close()} method may be called concurrently when cancelling / shutting down. It
+ * will still ensure that local transient resources (like streams and temp files) are cleaned up,
+ * but will not touch data previously persisted in OSS.
+ */
+@PublicEvolving
+@NotThreadSafe
+public class OSSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+ /**
+ * Lock that guards the critical sections when new parts are rolled over. Despite the class
+ * being declared not thread safe, we protect certain regions to at least enable concurrent
+ * close() calls during cancellation or abort/cleanup.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private long ossUploadPartSize;
+ private FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator;
+
+ private RefCountedBufferingFileStream fileStream;
+
+ private OSSRecoverableMultipartUpload upload;
+ private long sizeBeforeCurrentPart;
+
+ public OSSRecoverableFsDataOutputStream(
+ long ossUploadPartSize,
+ FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator,
+ OSSRecoverableMultipartUpload upload,
+ long sizeBeforeCurrentPart)
+ throws IOException {
+ this.ossUploadPartSize = ossUploadPartSize;
+ this.cachedFileCreator = cachedFileCreator;
+ this.upload = upload;
+
+ if (upload.getIncompletePart().isPresent()) {
+ this.fileStream =
+ RefCountedBufferingFileStream.restore(
+ this.cachedFileCreator, upload.getIncompletePart().get());
+ } else {
+ this.fileStream = RefCountedBufferingFileStream.openNew(this.cachedFileCreator);
+ }
+ this.sizeBeforeCurrentPart = sizeBeforeCurrentPart;
+ }
+
+ @Override
+ public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+ lock();
+ try {
+ fileStream.flush();
+
+ switchNewPartFileIfNecessary(ossUploadPartSize);
+
+ return upload.getRecoverable(fileStream);
+ } finally {
+ unlock();
+ }
+ }
+
+ @Override
+ public Committer closeForCommit() throws IOException {
+ lock();
+ try {
+ uploadCurrentPart();
+ return upload.getCommitter();
+ } finally {
+ unlock();
+ }
+ }
+
+ private void uploadCurrentPart() throws IOException {
+ fileStream.flush();
+ fileStream.close();
+ if (fileStream.getPos() > 0L) {
+ upload.uploadPart(fileStream);
+ }
+ fileStream.release();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return sizeBeforeCurrentPart + fileStream.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ fileStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ fileStream.write(b, off, len);
+ switchNewPartFileIfNecessary(ossUploadPartSize);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ fileStream.flush();
+ switchNewPartFileIfNecessary(ossUploadPartSize);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ fileStream.sync();
+ }
+
+ @Override
+ public void close() throws IOException {
+ lock();
+ try {
+ fileStream.flush();
+ } finally {
+ IOUtils.closeQuietly(fileStream);
+ fileStream.release();
+ unlock();
+ }
+ }
+
+ private void lock() throws IOException {
+ try {
+ lock.lockInterruptibly();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("interrupted exception: " + e);
+ }
+ }
+
+ private void unlock() {
+ lock.unlock();
+ }
+
+ private void switchNewPartFileIfNecessary(long partSizeThreshold) throws IOException {
+ final long length = fileStream.getPos();
+ if (length >= partSizeThreshold) {
+ lock();
+ try {
+ sizeBeforeCurrentPart += fileStream.getPos();
+
+ uploadCurrentPart();
+
+ fileStream = RefCountedBufferingFileStream.openNew(cachedFileCreator);
+ } finally {
+ unlock();
+ }
+ }
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUpload.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUpload.java
new file mode 100644
index 0000000..16d2fc6
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUpload.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.RefCountedFSOutputStream;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+
+import com.aliyun.oss.model.PartETag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Uploader for OSS multi part upload. */
+public class OSSRecoverableMultipartUpload {
+ private static final Logger LOG = LoggerFactory.getLogger(OSSRecoverableMultipartUpload.class);
+
+ private String objectName;
+ private String uploadId;
+ private List<PartETag> completeParts;
+ private Optional<File> incompletePart;
+ private Executor uploadThreadPool;
+ private OSSAccessor ossAccessor;
+ private Deque<CompletableFuture<PartETag>> uploadsInProgress;
+ private int numberOfRegisteredParts;
+ private long expectedSizeInBytes;
+ private final String namePrefixForTempObjects;
+
+ public OSSRecoverableMultipartUpload(
+ String objectName,
+ Executor uploadThreadPool,
+ OSSAccessor ossAccessor,
+ Optional<File> incompletePart,
+ String uploadId,
+ List<PartETag> completeParts,
+ long expectedSizeInBytes) {
+ this.objectName = objectName;
+ if (completeParts != null) {
+ this.completeParts = completeParts;
+ } else {
+ this.completeParts = new ArrayList<>();
+ }
+
+ this.incompletePart = incompletePart;
+ if (uploadId != null) {
+ this.uploadId = uploadId;
+ } else {
+ this.uploadId = ossAccessor.startMultipartUpload(objectName);
+ }
+ this.uploadThreadPool = uploadThreadPool;
+ this.ossAccessor = ossAccessor;
+ this.uploadsInProgress = new ArrayDeque<>();
+ this.numberOfRegisteredParts = this.completeParts.size();
+ this.expectedSizeInBytes = expectedSizeInBytes;
+ this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
+ }
+
+ public Optional<File> getIncompletePart() {
+ return incompletePart;
+ }
+
+ public void uploadPart(RefCountedFSOutputStream file) throws IOException {
+ checkState(file.isClosed());
+
+ final CompletableFuture<PartETag> future = new CompletableFuture<>();
+ uploadsInProgress.add(future);
+
+ numberOfRegisteredParts += 1;
+ expectedSizeInBytes += file.getPos();
+
+ file.retain();
+ uploadThreadPool.execute(
+ new UploadTask(
+ ossAccessor, objectName, uploadId, numberOfRegisteredParts, file, future));
+ }
+
+ public OSSRecoverable getRecoverable(RefCountedFSOutputStream file) throws IOException {
+ String incompletePartObjectName = uploadSmallPart(file);
+
+ checkState(numberOfRegisteredParts - completeParts.size() == uploadsInProgress.size());
+
+ while (numberOfRegisteredParts - completeParts.size() > 0) {
+ CompletableFuture<PartETag> next = uploadsInProgress.peekFirst();
+ PartETag nextPart;
+ try {
+ nextPart = next.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for part uploads to complete");
+ } catch (ExecutionException e) {
+ throw new IOException("Uploading parts failed ", e.getCause());
+ }
+
+ completeParts.add(nextPart);
+ uploadsInProgress.removeFirst();
+ }
+
+ if (file == null) {
+ return new OSSRecoverable(
+ uploadId, objectName, completeParts, null, expectedSizeInBytes, 0);
+ } else {
+ return new OSSRecoverable(
+ uploadId,
+ objectName,
+ completeParts,
+ incompletePartObjectName,
+ expectedSizeInBytes,
+ file.getPos());
+ }
+ }
+
+ private String uploadSmallPart(@Nullable RefCountedFSOutputStream file) throws IOException {
+ if (file == null || file.getPos() == 0L) {
+ return null;
+ }
+
+ String incompletePartObjectName = createIncompletePartObjectName();
+ file.retain();
+
+ try {
+ ossAccessor.putObject(incompletePartObjectName, file.getInputFile());
+ } finally {
+ file.release();
+ }
+ return incompletePartObjectName;
+ }
+
+ private String createIncompletePartObjectName() {
+ return namePrefixForTempObjects + UUID.randomUUID().toString();
+ }
+
+ @VisibleForTesting
+ static String createIncompletePartObjectNamePrefix(String objectName) {
+ checkNotNull(objectName);
+
+ final int lastSlash = objectName.lastIndexOf('/');
+ final String parent;
+ final String child;
+
+ if (lastSlash == -1) {
+ parent = "";
+ child = objectName;
+ } else {
+ parent = objectName.substring(0, lastSlash + 1);
+ child = objectName.substring(lastSlash + 1);
+ }
+ return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_";
+ }
+
+ public OSSCommitter getCommitter() throws IOException {
+ final OSSRecoverable recoverable = getRecoverable(null);
+ return new OSSCommitter(
+ ossAccessor,
+ recoverable.getObjectName(),
+ recoverable.getUploadId(),
+ recoverable.getPartETags(),
+ recoverable.getNumBytesInParts());
+ }
+
+ private static class UploadTask implements Runnable {
+ private final OSSAccessor ossAccessor;
+
+ private final String objectName;
+
+ private final String uploadId;
+
+ private final int partNumber;
+
+ private final RefCountedFSOutputStream file;
+
+ private final CompletableFuture<PartETag> future;
+
+ UploadTask(
+ OSSAccessor ossAccessor,
+ String objectName,
+ String uploadId,
+ int partNumber,
+ RefCountedFSOutputStream file,
+ CompletableFuture<PartETag> future) {
+ this.ossAccessor = ossAccessor;
+ this.objectName = objectName;
+ this.uploadId = uploadId;
+
+ checkArgument(partNumber >= 1 && partNumber <= 10_000);
+ this.partNumber = partNumber;
+
+ this.file = file;
+ this.future = future;
+ }
+
+ @Override
+ public void run() {
+ try {
+ PartETag partETag =
+ ossAccessor.uploadPart(
+ file.getInputFile(), objectName, uploadId, partNumber);
+ future.complete(partETag);
+ file.release();
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ }
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializer.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializer.java
new file mode 100644
index 0000000..d92b60d
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import com.aliyun.oss.model.PartETag;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+/** Serializer implementation for a {@link OSSRecoverable}. */
+@Internal
+public class OSSRecoverableSerializer implements SimpleVersionedSerializer<OSSRecoverable> {
+ static final OSSRecoverableSerializer INSTANCE = new OSSRecoverableSerializer();
+
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ private static final int MAGIC_NUMBER = 0x98761234;
+
+ private OSSRecoverableSerializer() {}
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(OSSRecoverable ossRecoverable) throws IOException {
+ final byte[] objectBytes = ossRecoverable.getObjectName().getBytes(CHARSET);
+ final byte[] uploadIdBytes = ossRecoverable.getUploadId().getBytes(CHARSET);
+
+ final byte[][] etags = new byte[ossRecoverable.getPartETags().size()][];
+
+ int partEtagBytes = 0;
+ for (int i = 0; i < ossRecoverable.getPartETags().size(); i++) {
+ etags[i] = ossRecoverable.getPartETags().get(i).getETag().getBytes(CHARSET);
+ partEtagBytes += etags[i].length + 2 * Integer.BYTES;
+ }
+
+ final String lastObjectKey = ossRecoverable.getLastPartObject();
+ final byte[] lastPartBytes = lastObjectKey == null ? null : lastObjectKey.getBytes(CHARSET);
+
+ /**
+ * magic number object name length + object name bytes upload id length + upload id bytes
+ * etags length + (part number + etag length + etag bytes)? number bytes of parts last part
+ * bytes length + last part bytes last part object length
+ */
+ final byte[] resultBytes =
+ new byte
+ [Integer.BYTES
+ + Integer.BYTES
+ + objectBytes.length
+ + Integer.BYTES
+ + uploadIdBytes.length
+ + Integer.BYTES
+ + partEtagBytes
+ + Long.BYTES
+ + Integer.BYTES
+ + (lastPartBytes == null ? 0 : lastPartBytes.length)
+ + Long.BYTES];
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(resultBytes).order(ByteOrder.LITTLE_ENDIAN);
+
+ byteBuffer.putInt(MAGIC_NUMBER);
+
+ byteBuffer.putInt(objectBytes.length);
+ byteBuffer.put(objectBytes);
+
+ byteBuffer.putInt(uploadIdBytes.length);
+ byteBuffer.put(uploadIdBytes);
+
+ byteBuffer.putInt(etags.length);
+ for (int i = 0; i < ossRecoverable.getPartETags().size(); i++) {
+ PartETag pe = ossRecoverable.getPartETags().get(i);
+ byteBuffer.putInt(pe.getPartNumber());
+ byteBuffer.putInt(etags[i].length);
+ byteBuffer.put(etags[i]);
+ }
+
+ byteBuffer.putLong(ossRecoverable.getNumBytesInParts());
+
+ if (lastPartBytes == null) {
+ byteBuffer.putInt(0);
+ } else {
+ byteBuffer.putInt(lastPartBytes.length);
+ byteBuffer.put(lastPartBytes);
+ }
+
+ byteBuffer.putLong(ossRecoverable.getLastPartObjectLength());
+
+ return resultBytes;
+ }
+
+ @Override
+ public OSSRecoverable deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private OSSRecoverable deserializeV1(byte[] serialized) throws IOException {
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ if (byteBuffer.getInt() != MAGIC_NUMBER) {
+ throw new IOException("Corrupt data: Unexpected magic number " + byteBuffer.getInt());
+ }
+
+ final byte[] objectBytes = new byte[byteBuffer.getInt()];
+ byteBuffer.get(objectBytes);
+
+ final byte[] uploadIdBytes = new byte[byteBuffer.getInt()];
+ byteBuffer.get(uploadIdBytes);
+
+ final int numParts = byteBuffer.getInt();
+ final ArrayList<PartETag> parts = new ArrayList<>(numParts);
+ for (int i = 0; i < numParts; i++) {
+ final int partNum = byteBuffer.getInt();
+ final byte[] buffer = new byte[byteBuffer.getInt()];
+ byteBuffer.get(buffer);
+ parts.add(new PartETag(partNum, new String(buffer, CHARSET)));
+ }
+
+ final long numBytes = byteBuffer.getLong();
+
+ final String lastPart;
+ final int lastObjectArraySize = byteBuffer.getInt();
+ if (lastObjectArraySize == 0) {
+ lastPart = null;
+ } else {
+ byte[] lastPartBytes = new byte[lastObjectArraySize];
+ byteBuffer.get(lastPartBytes);
+ lastPart = new String(lastPartBytes, CHARSET);
+ }
+
+ final long lastPartLength = byteBuffer.getLong();
+
+ return new OSSRecoverable(
+ new String(uploadIdBytes, CHARSET),
+ new String(objectBytes, CHARSET),
+ parts,
+ lastPart,
+ numBytes,
+ lastPartLength);
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableWriter.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableWriter.java
new file mode 100644
index 0000000..266d259
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.core.fs.BackPressuringExecutor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/**
+ * An implementation of the {@link RecoverableWriter} against OSS.
+ *
+ * <p>This implementation makes heavy use of MultiPart Uploads in OSS to persist intermediate data
+ * as soon as possible.
+ */
+public class OSSRecoverableWriter implements RecoverableWriter {
+
+ private OSSAccessor ossAccessor;
+ private long ossUploadPartSize;
+ private int streamConcurrentUploads;
+ private Executor executor;
+
+ private final FunctionWithException<File, RefCountedFileWithStream, IOException>
+ cachedFileCreator;
+
+ public OSSRecoverableWriter(
+ OSSAccessor ossAccessor,
+ long ossUploadPartSize,
+ int streamConcurrentUploads,
+ Executor executor,
+ FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator) {
+ this.ossAccessor = ossAccessor;
+ this.ossUploadPartSize = ossUploadPartSize;
+ this.streamConcurrentUploads = streamConcurrentUploads;
+ this.executor = executor;
+ this.cachedFileCreator = cachedFileCreator;
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream open(Path path) throws IOException {
+ return new OSSRecoverableFsDataOutputStream(
+ ossUploadPartSize,
+ cachedFileCreator,
+ new OSSRecoverableMultipartUpload(
+ ossAccessor.pathToObject(path),
+ getExecutor(),
+ ossAccessor,
+ Optional.empty(),
+ null,
+ null,
+ 0),
+ 0L);
+ }
+
+ @Override
+ public OSSRecoverableFsDataOutputStream recover(ResumeRecoverable resumable)
+ throws IOException {
+ final OSSRecoverable recoverable = (OSSRecoverable) resumable;
+ OSSRecoverableMultipartUpload upload =
+ new OSSRecoverableMultipartUpload(
+ recoverable.getObjectName(),
+ getExecutor(),
+ ossAccessor,
+ recoverInProgressPart(recoverable),
+ recoverable.getUploadId(),
+ recoverable.getPartETags(),
+ recoverable.getNumBytesInParts());
+ return new OSSRecoverableFsDataOutputStream(
+ ossUploadPartSize, cachedFileCreator, upload, recoverable.getNumBytesInParts());
+ }
+
+ private Optional<File> recoverInProgressPart(OSSRecoverable recoverable) throws IOException {
+ String objectKey = recoverable.getLastPartObject();
+ if (objectKey == null) {
+ return Optional.empty();
+ }
+
+ final RefCountedFileWithStream refCountedFile = cachedFileCreator.apply(null);
+ final File file = refCountedFile.getFile();
+
+ ossAccessor.getObject(
+ objectKey, file.getAbsolutePath(), recoverable.getLastPartObjectLength());
+
+ return Optional.of(file);
+ }
+
+ @Override
+ public boolean requiresCleanupOfRecoverableState() {
+ return true;
+ }
+
+ @Override
+ public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+ final OSSRecoverable ossRecoverable = (OSSRecoverable) resumable;
+ final String smallPartObjectToDelete = ossRecoverable.getLastPartObject();
+ return smallPartObjectToDelete != null && ossAccessor.deleteObject(smallPartObjectToDelete);
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable)
+ throws IOException {
+ final OSSRecoverableFsDataOutputStream recovered = recover((OSSRecoverable) resumable);
+ return recovered.closeForCommit();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+ return (SimpleVersionedSerializer) OSSRecoverableSerializer.INSTANCE;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+ return (SimpleVersionedSerializer) OSSRecoverableSerializer.INSTANCE;
+ }
+
+ @Override
+ public boolean supportsResume() {
+ return true;
+ }
+
+ private Executor getExecutor() {
+ if (streamConcurrentUploads <= 0) {
+ return executor;
+ }
+ return new BackPressuringExecutor(executor, streamConcurrentUploads);
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterExceptionITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterExceptionITCase.java
new file mode 100644
index 0000000..ed4964b
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterExceptionITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.AbstractHadoopRecoverableWriterExceptionITCase;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.fs.osshadoop.OSSFileSystemFactory.MAX_CONCURRENT_UPLOADS;
+
+/**
+ * Tests for exception throwing in the {@link
+ * org.apache.flink.fs.osshadoop.writer.OSSRecoverableWriter OSSRecoverableWriter}.
+ */
+public class HadoopOSSRecoverableWriterExceptionITCase
+ extends AbstractHadoopRecoverableWriterExceptionITCase {
+
+ // ----------------------- OSS general configuration -----------------------
+
+ private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+ @BeforeClass
+ public static void checkCredentialsAndSetup() throws IOException {
+ // check whether credentials exist
+ OSSTestCredentials.assumeCredentialsAvailable();
+
+ basePath = new Path(OSSTestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
+
+ // initialize configuration with valid credentials
+ final Configuration conf = new Configuration();
+ conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+ conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+ conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+
+ conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+ final String defaultTmpDir = TEMP_FOLDER.getRoot().getAbsolutePath() + "/oss_tmp_dir";
+ conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+ FileSystem.initialize(conf);
+
+ skipped = false;
+ }
+
+ @Override
+ protected String getLocalTmpDir() throws Exception {
+ return ((FlinkOSSFileSystem) getFileSystem()).getLocalTmpDir();
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterITCase.java
new file mode 100644
index 0000000..715f7ce
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.osshadoop.writer.OSSRecoverable;
+import org.apache.flink.runtime.fs.hdfs.AbstractHadoopRecoverableWriterITCase;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.fs.osshadoop.OSSFileSystemFactory.MAX_CONCURRENT_UPLOADS;
+import static org.apache.flink.fs.osshadoop.OSSFileSystemFactory.PART_UPLOAD_MIN_SIZE;
+
+/**
+ * Tests for the {@link org.apache.flink.fs.osshadoop.writer.OSSRecoverableWriter
+ * OSSRecoverableWriter}.
+ */
+public class HadoopOSSRecoverableWriterITCase extends AbstractHadoopRecoverableWriterITCase {
+
+ // ----------------------- OSS general configuration -----------------------
+
+ private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+ @BeforeClass
+ public static void checkCredentialsAndSetup() throws IOException {
+ // check whether credentials exist
+ OSSTestCredentials.assumeCredentialsAvailable();
+
+ basePath = new Path(OSSTestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
+
+ // initialize configuration with valid credentials
+ final Configuration conf = new Configuration();
+ conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+ conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+ conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+
+ conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+ final String defaultTmpDir = TEMP_FOLDER.getRoot().getAbsolutePath() + "/oss_tmp_dir";
+ conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+ FileSystem.initialize(conf);
+ bigDataChunk =
+ createBigDataChunk(BIG_CHUNK_DATA_PATTERN, PART_UPLOAD_MIN_SIZE.defaultValue());
+ skipped = false;
+ }
+
+ @Override
+ protected String getLocalTmpDir() throws Exception {
+ return ((FlinkOSSFileSystem) getFileSystem()).getLocalTmpDir();
+ }
+
+ @Override
+ protected String getIncompleteObjectName(RecoverableWriter.ResumeRecoverable recoverable) {
+ return ((OSSRecoverable) recoverable).getLastPartObject();
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/OSSTestUtils.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/OSSTestUtils.java
new file mode 100644
index 0000000..47330cf
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/OSSTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.fs.osshadoop.writer.OSSRecoverableMultipartUpload;
+
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SplittableRandom;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertEquals;
+
+/** OSS test utility class. */
+public class OSSTestUtils {
+ private static final int BUFFER_SIZE = 10;
+
+ public static void objectContentEquals(
+ FileSystem fs, Path objectPath, List<byte[]> expectContents) throws IOException {
+ String actualContent;
+ try (FSDataInputStream in = fs.open(objectPath);
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ byte[] buffer = new byte[4096];
+ int bytes = in.read(buffer);
+ while (bytes != -1) {
+ out.write(buffer, 0, bytes);
+ bytes = in.read(buffer);
+ }
+ actualContent = out.toString(StandardCharsets.UTF_8.name());
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (byte[] bytes : expectContents) {
+ out.write(bytes);
+ }
+ assertEquals(out.toString(), actualContent);
+ }
+
+ public static void objectContentEquals(FileSystem fs, Path objectPath, byte[]... expectContents)
+ throws IOException {
+ objectContentEquals(fs, objectPath, Arrays.asList(expectContents));
+ }
+
+ public static byte[] bytesOf(String str, long requiredSize) {
+ StringBuilder sb = new StringBuilder();
+ while (sb.length() < requiredSize) {
+ sb.append(str);
+ }
+ return sb.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ public static void uploadPart(
+ OSSRecoverableMultipartUpload uploader,
+ final TemporaryFolder temporaryFolder,
+ final byte[] content)
+ throws IOException {
+ RefCountedBufferingFileStream partFile = writeData(temporaryFolder, content);
+
+ partFile.close();
+
+ uploader.uploadPart(partFile);
+ }
+
+ public static RefCountedBufferingFileStream writeData(
+ TemporaryFolder temporaryFolder, byte[] content) throws IOException {
+ final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+ final OutputStream out =
+ Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+ final RefCountedBufferingFileStream testStream =
+ new RefCountedBufferingFileStream(
+ RefCountedFileWithStream.newFile(newFile, out), BUFFER_SIZE);
+
+ testStream.write(content, 0, content.length);
+ return testStream;
+ }
+
+ public static List<byte[]> generateRandomBuffer(long size, int partSize) {
+ List<byte[]> buffers = new ArrayList<>();
+
+ final SplittableRandom random = new SplittableRandom();
+
+ long totalSize = 0L;
+
+ while (totalSize < size) {
+ int bufferSize = random.nextInt(0, 2 * partSize);
+ byte[] buffer = new byte[bufferSize];
+ for (int i = 0; i < bufferSize; ++i) {
+ buffer[i] = (byte) (random.nextInt() & 0xFF);
+ }
+
+ buffers.add(buffer);
+ totalSize += bufferSize;
+ }
+
+ return buffers;
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStreamTest.java
new file mode 100644
index 0000000..d452a5f
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStreamTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.osshadoop.OSSTestUtils;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertFalse;
+
+/** Tests for the {@link OSSRecoverableFsDataOutputStream}. */
+public class OSSRecoverableFsDataOutputStreamTest {
+
+ private static Path basePath;
+
+ private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+ private FileSystem fs;
+
+ private static final String TEST_OBJECT_NAME_PREFIX = "TEST-OBJECT-";
+
+ private Path objectPath;
+
+ private RecoverableWriter writer;
+
+ private RecoverableFsDataOutputStream fsDataOutputStream;
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws IOException {
+ OSSTestCredentials.assumeCredentialsAvailable();
+
+ final Configuration conf = new Configuration();
+ conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+ conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+ conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+ FileSystem.initialize(conf);
+
+ basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
+ fs = basePath.getFileSystem();
+ writer = fs.createRecoverableWriter();
+
+ objectPath = new Path(basePath + "/" + TEST_OBJECT_NAME_PREFIX + UUID.randomUUID());
+
+ fsDataOutputStream = writer.open(objectPath);
+ }
+
+ @Test
+ public void testRegularDataWritten() throws IOException {
+ final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+ fsDataOutputStream.write(part);
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, part);
+ }
+
+ @Test
+ public void testNoDataWritten() throws IOException {
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ // will not create empty object
+ assertFalse(fs.exists(objectPath));
+ }
+
+ @Test(expected = IOException.class)
+ public void testCloseForCommitOnClosedStreamShouldFail() throws IOException {
+ fsDataOutputStream.closeForCommit().commit();
+ fsDataOutputStream.closeForCommit().commit();
+ }
+
+ @Test
+ public void testCloseWithoutCommit() throws IOException {
+ final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+ fsDataOutputStream.write(part);
+
+ fsDataOutputStream.close();
+
+ // close without commit will not upload current part
+ assertFalse(fs.exists(objectPath));
+ }
+
+ @Test
+ public void testWriteLargeFile() throws IOException {
+ List<byte[]> buffers = OSSTestUtils.generateRandomBuffer(50 * 1024 * 1024, 10 * 104 * 1024);
+ for (byte[] buffer : buffers) {
+ fsDataOutputStream.write(buffer);
+ }
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, buffers);
+ }
+
+ @Test
+ public void testConcatWrites() throws IOException {
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("hello", 5));
+ fsDataOutputStream.write(OSSTestUtils.bytesOf(" ", 1));
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("world", 5));
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, OSSTestUtils.bytesOf("hello world", 11));
+ }
+
+ @Test
+ public void testRegularRecovery() throws IOException {
+ final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+ fsDataOutputStream.write(part);
+
+ RecoverableWriter.ResumeRecoverable recoverable = fsDataOutputStream.persist();
+
+ fsDataOutputStream = writer.recover(recoverable);
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, part);
+ }
+
+ @Test
+ public void testContinuousPersistWithoutWrites() throws IOException {
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("hello", 5));
+
+ fsDataOutputStream.persist();
+ fsDataOutputStream.persist();
+ fsDataOutputStream.persist();
+ fsDataOutputStream.persist();
+
+ fsDataOutputStream.write(OSSTestUtils.bytesOf(" ", 1));
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("world", 5));
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, OSSTestUtils.bytesOf("hello world", 11));
+ }
+
+ @Test
+ public void testWriteSmallDataAndPersist() throws IOException {
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("h", 1));
+ fsDataOutputStream.persist();
+
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("e", 1));
+ fsDataOutputStream.persist();
+
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("l", 1));
+ fsDataOutputStream.persist();
+
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("l", 1));
+ fsDataOutputStream.persist();
+
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("o", 1));
+ fsDataOutputStream.persist();
+
+ fsDataOutputStream.write(OSSTestUtils.bytesOf(" ", 1));
+ fsDataOutputStream.write(OSSTestUtils.bytesOf("world", 5));
+ fsDataOutputStream.persist();
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, OSSTestUtils.bytesOf("hello world", 11));
+ }
+
+ @Test
+ public void testWriteBigDataAndPersist() throws IOException {
+ List<byte[]> buffers = OSSTestUtils.generateRandomBuffer(50 * 1024 * 1024, 10 * 104 * 1024);
+ for (byte[] buffer : buffers) {
+ fsDataOutputStream.write(buffer);
+ fsDataOutputStream.persist();
+ }
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, buffers);
+ }
+
+ @Test
+ public void testDataWrittenAfterRecovery() throws IOException {
+ final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+ fsDataOutputStream.write(part);
+
+ RecoverableWriter.ResumeRecoverable recoverable = fsDataOutputStream.persist();
+
+ fsDataOutputStream = writer.recover(recoverable);
+
+ List<byte[]> buffers = OSSTestUtils.generateRandomBuffer(50 * 1024 * 1024, 10 * 104 * 1024);
+ for (byte[] buffer : buffers) {
+ fsDataOutputStream.write(buffer);
+ }
+
+ RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+ committer.commit();
+
+ buffers.add(0, part);
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, buffers);
+ }
+
+ @After
+ public void after() throws IOException {
+ try {
+ if (fs != null) {
+ fs.delete(basePath, true);
+ }
+ } finally {
+ FileSystem.initialize(new Configuration());
+ }
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUploadTest.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUploadTest.java
new file mode 100644
index 0000000..c435d72
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUploadTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+import org.apache.flink.fs.osshadoop.OSSTestUtils;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertNotNull;
+
+/** Tests for the {@link OSSRecoverableMultipartUpload}. */
+public class OSSRecoverableMultipartUploadTest {
+
+ private static Path basePath;
+
+ private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+ private FileSystem fs;
+
+ private static final String TEST_OBJECT_NAME_PREFIX = "TEST-OBJECT-";
+
+ private Path objectPath;
+
+ private String uploadId;
+
+ private List<PartETag> completeParts;
+
+ private OSSAccessor ossAccessor;
+
+ private OSSRecoverableMultipartUpload uploader;
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws IOException {
+ OSSTestCredentials.assumeCredentialsAvailable();
+
+ final Configuration conf = new Configuration();
+ conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+ conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+ conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+ FileSystem.initialize(conf);
+
+ basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
+ fs = basePath.getFileSystem();
+
+ objectPath = new Path(basePath + "/" + TEST_OBJECT_NAME_PREFIX + UUID.randomUUID());
+
+ ossAccessor =
+ new OSSAccessor(
+ (AliyunOSSFileSystem) ((HadoopFileSystem) fs).getHadoopFileSystem());
+
+ uploadId = ossAccessor.startMultipartUpload(ossAccessor.pathToObject(objectPath));
+
+ completeParts = new ArrayList<>();
+
+ uploader =
+ new OSSRecoverableMultipartUpload(
+ ossAccessor.pathToObject(objectPath),
+ Executors.newCachedThreadPool(),
+ ossAccessor,
+ null,
+ uploadId,
+ completeParts,
+ 0L);
+ }
+
+ @Test
+ public void testUploadSinglePart() throws IOException {
+ final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+ OSSTestUtils.uploadPart(uploader, temporaryFolder, part);
+
+ uploader.getRecoverable(null);
+
+ ossAccessor.completeMultipartUpload(
+ ossAccessor.pathToObject(objectPath), uploadId, completeParts);
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, part);
+ }
+
+ @Test
+ public void testUploadIncompletePart() throws IOException {
+ final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+ RefCountedBufferingFileStream partFile = OSSTestUtils.writeData(temporaryFolder, part);
+
+ partFile.close();
+
+ OSSRecoverable ossRecoverable = uploader.getRecoverable(partFile);
+
+ OSSTestUtils.objectContentEquals(
+ fs, ossAccessor.objectToPath(ossRecoverable.getLastPartObject()), part);
+ }
+
+ @Test
+ public void testMultipartAndIncompletePart() throws IOException {
+ final byte[] firstCompletePart = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+ final byte[] secondCompletePart = OSSTestUtils.bytesOf("hello again", 1024 * 1024);
+ final byte[] thirdIncompletePart = OSSTestUtils.bytesOf("!!!", 1024);
+
+ OSSTestUtils.uploadPart(uploader, temporaryFolder, firstCompletePart);
+ OSSTestUtils.uploadPart(uploader, temporaryFolder, secondCompletePart);
+
+ RefCountedBufferingFileStream partFile =
+ OSSTestUtils.writeData(temporaryFolder, thirdIncompletePart);
+
+ partFile.close();
+
+ OSSRecoverable ossRecoverable = uploader.getRecoverable(partFile);
+
+ assertEquals(ossRecoverable.getPartETags().size(), 2);
+ assertNotNull(ossRecoverable.getLastPartObject());
+ assertEquals(ossRecoverable.getLastPartObjectLength(), 1026);
+ assertEquals(ossRecoverable.getNumBytesInParts(), 2 * 1024 * 1024 + 20);
+ assertEquals(ossRecoverable.getUploadId(), uploadId);
+ assertEquals(ossRecoverable.getObjectName(), ossAccessor.pathToObject(objectPath));
+
+ ossAccessor.completeMultipartUpload(
+ ossAccessor.pathToObject(objectPath), uploadId, completeParts);
+
+ OSSTestUtils.objectContentEquals(fs, objectPath, firstCompletePart, secondCompletePart);
+
+ OSSTestUtils.objectContentEquals(
+ fs,
+ ossAccessor.objectToPath(ossRecoverable.getLastPartObject()),
+ thirdIncompletePart);
+ }
+
+ @Test
+ public void testRecoverableReflectsTheLatestPartialObject() throws IOException {
+ final byte[] incompletePartOne = OSSTestUtils.bytesOf("AB", 1024);
+ final byte[] incompletePartTwo = OSSTestUtils.bytesOf("ABC", 1024);
+
+ RefCountedBufferingFileStream partFile =
+ OSSTestUtils.writeData(temporaryFolder, incompletePartOne);
+ partFile.close();
+
+ OSSRecoverable recoverableOne = uploader.getRecoverable(partFile);
+
+ partFile = OSSTestUtils.writeData(temporaryFolder, incompletePartTwo);
+ partFile.close();
+ OSSRecoverable recoverableTwo = uploader.getRecoverable(partFile);
+
+ assertFalse(recoverableOne.getLastPartObject().equals(recoverableTwo.getLastPartObject()));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUploadingNonClosedFileAsCompleteShouldThroughException() throws IOException {
+ final byte[] incompletePart = OSSTestUtils.bytesOf("!!!", 1024);
+
+ RefCountedBufferingFileStream partFile =
+ OSSTestUtils.writeData(temporaryFolder, incompletePart);
+
+ uploader.uploadPart(partFile);
+ }
+
+ @After
+ public void after() throws IOException {
+ try {
+ if (fs != null) {
+ fs.delete(basePath, true);
+ }
+ } finally {
+ FileSystem.initialize(new Configuration());
+ }
+ }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializerTest.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializerTest.java
new file mode 100644
index 0000000..b3a71fc
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import com.aliyun.oss.model.PartETag;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Tests for the {@link OSSRecoverableSerializer}. */
+public class OSSRecoverableSerializerTest {
+
+ private final OSSRecoverableSerializer serializer = OSSRecoverableSerializer.INSTANCE;
+
+ private static final String TEST_OBJECT_NAME = "TEST-OBJECT";
+
+ private static final String TEST_UPLOAD_ID = "TEST-UPLOAD-ID";
+
+ private static final String INCOMPLETE_OBJECT_NAME = "TEST-INCOMPLETE-PART";
+
+ private static final String ETAG_PREFIX = "TEST-ETAG-";
+
+ @Test
+ public void testSerializeEmptyOSSRecoverable() throws IOException {
+ OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(false);
+
+ byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+ OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+ assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+ }
+
+ @Test
+ public void testSerializeOSSRecoverableOnlyWithIncompleteObject() throws IOException {
+ OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(true);
+
+ byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+ OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+ assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+ }
+
+ @Test
+ public void testSerializeOSSRecoverableWithIncompleteObject() throws IOException {
+ OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(true, 2, 4, 6);
+
+ byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+ OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+ assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+ }
+
+ @Test
+ public void testSerializeOSSRecoverableWithoutIncompleteObject() throws IOException {
+ OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(false, 2, 4, 6);
+
+ byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+ OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+ assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+ }
+
+ // --------------------------------- Matchers ---------------------------------
+
+ private static TypeSafeMatcher<OSSRecoverable> isEqualTo(OSSRecoverable expectedRecoverable) {
+ return new TypeSafeMatcher<OSSRecoverable>() {
+
+ @Override
+ protected boolean matchesSafely(OSSRecoverable actualRecoverable) {
+
+ return Objects.equals(
+ expectedRecoverable.getObjectName(),
+ actualRecoverable.getObjectName())
+ && Objects.equals(
+ expectedRecoverable.getUploadId(), actualRecoverable.getUploadId())
+ && expectedRecoverable.getNumBytesInParts()
+ == actualRecoverable.getNumBytesInParts()
+ && Objects.equals(
+ expectedRecoverable.getLastPartObject(),
+ actualRecoverable.getLastPartObject())
+ && expectedRecoverable.getLastPartObjectLength()
+ == actualRecoverable.getLastPartObjectLength()
+ && compareLists(
+ expectedRecoverable.getPartETags(),
+ actualRecoverable.getPartETags());
+ }
+
+ private boolean compareLists(final List<PartETag> first, final List<PartETag> second) {
+ return Arrays.equals(
+ first.stream().map(PartETag::getETag).toArray(),
+ second.stream().map(PartETag::getETag).toArray());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(
+ expectedRecoverable + " with ignored LAST_PART_OBJECT_NAME.");
+ }
+ };
+ }
+
+ // --------------------------------- Test Utils ---------------------------------
+
+ private static OSSRecoverable createOSSRecoverable(
+ boolean withIncompletePart, int... partNumbers) {
+ List<PartETag> etags = new ArrayList<>();
+ for (int i : partNumbers) {
+ etags.add(createEtag(i));
+ }
+
+ if (withIncompletePart) {
+ return new OSSRecoverable(
+ TEST_UPLOAD_ID,
+ TEST_OBJECT_NAME,
+ etags,
+ INCOMPLETE_OBJECT_NAME,
+ 12345L,
+ 54321L);
+ } else {
+ return new OSSRecoverable(TEST_UPLOAD_ID, TEST_OBJECT_NAME, etags, null, 12345L, 0L);
+ }
+ }
+
+ private static PartETag createEtag(int partNumber) {
+ return new PartETag(partNumber, ETAG_PREFIX + partNumber);
+ }
+}