You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/28 08:23:36 UTC

[flink] branch master updated: [FLINK-11388][oss] Add Aliyun OSS recoverable writer

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 672d26e  [FLINK-11388][oss] Add Aliyun OSS recoverable writer
672d26e is described below

commit 672d26e831047a99b4d6432fe0046ff036279887
Author: Jinhu Wu <wu...@126.com>
AuthorDate: Mon Mar 28 16:22:58 2022 +0800

    [FLINK-11388][oss] Add Aliyun OSS recoverable writer
    
    This closes #19072
---
 .../flink/fs/osshadoop/FlinkOSSFileSystem.java     | 103 +++++++++
 .../org/apache/flink/fs/osshadoop/OSSAccessor.java | 100 +++++++++
 .../flink/fs/osshadoop/OSSFileSystemFactory.java   |  32 ++-
 .../flink/fs/osshadoop/writer/OSSCommitter.java    |  93 ++++++++
 .../flink/fs/osshadoop/writer/OSSRecoverable.java  |  83 +++++++
 .../writer/OSSRecoverableFsDataOutputStream.java   | 186 +++++++++++++++
 .../writer/OSSRecoverableMultipartUpload.java      | 237 ++++++++++++++++++++
 .../osshadoop/writer/OSSRecoverableSerializer.java | 170 ++++++++++++++
 .../fs/osshadoop/writer/OSSRecoverableWriter.java  | 152 +++++++++++++
 .../HadoopOSSRecoverableWriterExceptionITCase.java |  73 ++++++
 .../HadoopOSSRecoverableWriterITCase.java          |  81 +++++++
 .../apache/flink/fs/osshadoop/OSSTestUtils.java    | 129 +++++++++++
 .../OSSRecoverableFsDataOutputStreamTest.java      | 249 +++++++++++++++++++++
 .../writer/OSSRecoverableMultipartUploadTest.java  | 205 +++++++++++++++++
 .../writer/OSSRecoverableSerializerTest.java       | 151 +++++++++++++
 15 files changed, 2042 insertions(+), 2 deletions(-)

diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/FlinkOSSFileSystem.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/FlinkOSSFileSystem.java
new file mode 100644
index 0000000..7bce99e
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/FlinkOSSFileSystem.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.core.fs.RefCountedTmpFileCreator;
+import org.apache.flink.fs.osshadoop.writer.OSSRecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystem} interface for Aliyun OSS.
+ * This class implements the common behavior implemented directly by Flink and delegates common
+ * calls to an implementation of Hadoop's filesystem abstraction.
+ */
+public class FlinkOSSFileSystem extends HadoopFileSystem {
+
+    // Minimum size of each of or multipart pieces in bytes
+    public static final long MULTIPART_UPLOAD_PART_SIZE_MIN = 10L << 20;
+
+    // Size of each of or multipart pieces in bytes
+    private long ossUploadPartSize;
+
+    private int maxConcurrentUploadsPerStream;
+
+    private final Executor uploadThreadPool;
+
+    private String localTmpDir;
+
+    private final FunctionWithException<File, RefCountedFileWithStream, IOException>
+            cachedFileCreator;
+
+    private OSSAccessor ossAccessor;
+
+    public FlinkOSSFileSystem(
+            org.apache.hadoop.fs.FileSystem fileSystem,
+            long ossUploadPartSize,
+            int maxConcurrentUploadsPerStream,
+            String localTmpDirectory,
+            OSSAccessor ossAccessor) {
+        super(fileSystem);
+
+        Preconditions.checkArgument(ossUploadPartSize >= MULTIPART_UPLOAD_PART_SIZE_MIN);
+
+        this.ossUploadPartSize = ossUploadPartSize;
+        this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+
+        this.uploadThreadPool = Executors.newCachedThreadPool();
+
+        // recoverable writer parameter configuration initialization
+        // Temporary directory for cache data before uploading to OSS
+
+        this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
+
+        this.cachedFileCreator =
+                RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
+
+        this.ossAccessor = ossAccessor;
+    }
+
+    @Override
+    public FileSystemKind getKind() {
+        return FileSystemKind.OBJECT_STORE;
+    }
+
+    @Override
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+        return new OSSRecoverableWriter(
+                ossAccessor,
+                ossUploadPartSize,
+                maxConcurrentUploadsPerStream,
+                uploadThreadPool,
+                cachedFileCreator);
+    }
+
+    public String getLocalTmpDir() {
+        return localTmpDir;
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSAccessor.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSAccessor.java
new file mode 100644
index 0000000..a515f2a
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSAccessor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Core implementation of Aliyun OSS Filesystem for Flink. Provides the bridging logic between
+ * Hadoop's abstract filesystem and Aliyun OSS.
+ */
+public class OSSAccessor {
+
+    private AliyunOSSFileSystem fs;
+    private AliyunOSSFileSystemStore store;
+
+    public OSSAccessor(AliyunOSSFileSystem fs) {
+        this.fs = fs;
+        this.store = fs.getStore();
+    }
+
+    public String pathToObject(final Path path) {
+        org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
+        if (!hadoopPath.isAbsolute()) {
+            hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
+        }
+
+        return hadoopPath.toUri().getPath().substring(1);
+    }
+
+    public Path objectToPath(String object) {
+        return new Path("/" + object);
+    }
+
+    public String startMultipartUpload(String objectName) {
+        return store.getUploadId(objectName);
+    }
+
+    public boolean deleteObject(String objectName) throws IOException {
+        return fs.delete(new org.apache.hadoop.fs.Path('/' + objectName), false);
+    }
+
+    public CompleteMultipartUploadResult completeMultipartUpload(
+            String objectName, String uploadId, List<PartETag> partETags) {
+        return store.completeMultipartUpload(objectName, uploadId, partETags);
+    }
+
+    public PartETag uploadPart(File file, String objectName, String uploadId, int idx)
+            throws IOException {
+        return store.uploadPart(file, objectName, uploadId, idx);
+    }
+
+    public void putObject(String objectName, File file) throws IOException {
+        store.uploadObject(objectName, file);
+    }
+
+    public void getObject(String objectName, String dstPath, long length) throws IOException {
+        long contentLength = store.getObjectMetadata(objectName).getContentLength();
+        if (contentLength != length) {
+            throw new IOException(
+                    String.format(
+                            "Error recovering writer: "
+                                    + "Downloading the last data chunk file gives incorrect length."
+                                    + "File length is %d bytes, RecoveryData indicates %d bytes",
+                            contentLength, length));
+        }
+
+        org.apache.hadoop.fs.Path srcPath = new org.apache.hadoop.fs.Path("/" + objectName);
+        org.apache.hadoop.fs.Path localPath = new org.apache.hadoop.fs.Path(dstPath);
+        fs.copyToLocalFile(srcPath, localPath);
+
+        String crcFileName = "." + localPath.getName() + ".crc";
+        (new File(localPath.getParent().toString() + "/" + crcFileName)).delete();
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
index 6a1aca7..768f417 100644
--- a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/OSSFileSystemFactory.java
@@ -19,10 +19,13 @@
 package org.apache.flink.fs.osshadoop;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
 import org.slf4j.Logger;
@@ -52,6 +55,23 @@ public class OSSFileSystemFactory implements FileSystemFactory {
      */
     private static final String[] FLINK_CONFIG_PREFIXES = {"fs.oss."};
 
+    public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
+            ConfigOptions.key("oss.upload.min.part.size")
+                    .defaultValue(FlinkOSSFileSystem.MULTIPART_UPLOAD_PART_SIZE_MIN)
+                    .withDescription(
+                            "This option is relevant to the Recoverable Writer and sets the min size of data that "
+                                    + "buffered locally, before being sent to OSS. Flink also takes care of checkpoint locally "
+                                    + "buffered data. This value cannot be less than 100KB or greater than 5GB (limits set by Aliyun OSS).");
+
+    public static final ConfigOption<Integer> MAX_CONCURRENT_UPLOADS =
+            ConfigOptions.key("oss.upload.max.concurrent.uploads")
+                    .defaultValue(Runtime.getRuntime().availableProcessors())
+                    .withDescription(
+                            "This option is relevant to the Recoverable Writer and limits the number of "
+                                    + "parts that can be concurrently in-flight. By default, this is set to "
+                                    + Runtime.getRuntime().availableProcessors()
+                                    + ".");
+
     @Override
     public String getScheme() {
         return "oss";
@@ -81,7 +101,15 @@ public class OSSFileSystemFactory implements FileSystemFactory {
 
         final AliyunOSSFileSystem fs = new AliyunOSSFileSystem();
         fs.initialize(fsUri, hadoopConfig);
-        return new HadoopFileSystem(fs);
+        final String[] localTmpDirectories = ConfigurationUtils.parseTempDirectories(flinkConfig);
+        Preconditions.checkArgument(localTmpDirectories.length > 0);
+        final String localTmpDirectory = localTmpDirectories[0];
+        return new FlinkOSSFileSystem(
+                fs,
+                flinkConfig.getLong(PART_UPLOAD_MIN_SIZE),
+                flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS),
+                localTmpDirectory,
+                new OSSAccessor(fs));
     }
 
     @VisibleForTesting
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSCommitter.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSCommitter.java
new file mode 100644
index 0000000..699c465
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSCommitter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+
+import com.aliyun.oss.model.PartETag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data object to commit an OSS MultiPartUpload. */
+public class OSSCommitter implements RecoverableFsDataOutputStream.Committer {
+    private static final Logger LOG = LoggerFactory.getLogger(OSSCommitter.class);
+
+    private OSSAccessor ossAccessor;
+    private String objectName;
+    private String uploadId;
+    private List<PartETag> partETags;
+    private long totalLength;
+
+    public OSSCommitter(
+            OSSAccessor ossAccessor,
+            String objectName,
+            String uploadId,
+            List<PartETag> partETags,
+            long totalLength) {
+        this.ossAccessor = checkNotNull(ossAccessor);
+        this.objectName = checkNotNull(objectName);
+        this.uploadId = checkNotNull(uploadId);
+        this.partETags = checkNotNull(partETags);
+        this.totalLength = totalLength;
+    }
+
+    @Override
+    public void commit() throws IOException {
+        if (totalLength > 0L) {
+            LOG.info("Committing {} with multi-part upload ID {}", objectName, uploadId);
+            ossAccessor.completeMultipartUpload(objectName, uploadId, partETags);
+        } else {
+            LOG.debug("No data to commit for file: {}", objectName);
+        }
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+        if (totalLength > 0L) {
+            try {
+                LOG.info(
+                        "Trying to commit after recovery {} with multi-part upload ID {}",
+                        objectName,
+                        uploadId);
+                ossAccessor.completeMultipartUpload(objectName, uploadId, partETags);
+            } catch (Exception e) {
+                LOG.info(
+                        "Failed to commit after recovery {} with multi-part upload ID {}. "
+                                + "exception {}",
+                        objectName,
+                        uploadId,
+                        e);
+            }
+        } else {
+            LOG.debug("No data to commit for file: {}", objectName);
+        }
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+        return new OSSRecoverable(uploadId, objectName, partETags, null, totalLength, 0);
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverable.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverable.java
new file mode 100644
index 0000000..c177e1a
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import com.aliyun.oss.model.PartETag;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Data object to recover an OSS MultiPartUpload for a recoverable output stream. */
+public class OSSRecoverable implements RecoverableWriter.ResumeRecoverable {
+    private final String uploadId;
+
+    private final String objectName;
+
+    private final List<PartETag> partETags;
+
+    @Nullable private final String lastPartObject;
+
+    private long numBytesInParts;
+
+    private long lastPartObjectLength;
+
+    public OSSRecoverable(
+            String uploadId,
+            String objectName,
+            List<PartETag> partETags,
+            String lastPartObject,
+            long numBytesInParts,
+            long lastPartObjectLength) {
+        this.uploadId = uploadId;
+        this.objectName = objectName;
+        this.partETags = new ArrayList<>(partETags);
+        this.lastPartObject = lastPartObject;
+        this.numBytesInParts = numBytesInParts;
+        this.lastPartObjectLength = lastPartObjectLength;
+    }
+
+    public String getUploadId() {
+        return uploadId;
+    }
+
+    public String getObjectName() {
+        return objectName;
+    }
+
+    public List<PartETag> getPartETags() {
+        return partETags;
+    }
+
+    @Nullable
+    public String getLastPartObject() {
+        return lastPartObject;
+    }
+
+    public long getNumBytesInParts() {
+        return numBytesInParts;
+    }
+
+    public long getLastPartObjectLength() {
+        return lastPartObjectLength;
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..eb8ce7a
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStream.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.commons.io.IOUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A RecoverableFsDataOutputStream to OSS that is based on a recoverable multipart upload.
+ *
+ * <p>This class is NOT thread-safe. Concurrent writes tho this stream result in corrupt or lost
+ * data.
+ *
+ * <p>The {@link #close()} method may be called concurrently when cancelling / shutting down. It
+ * will still ensure that local transient resources (like streams and temp files) are cleaned up,
+ * but will not touch data previously persisted in OSS.
+ */
+@PublicEvolving
+@NotThreadSafe
+public class OSSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+    /**
+     * Lock that guards the critical sections when new parts are rolled over. Despite the class
+     * being declared not thread safe, we protect certain regions to at least enable concurrent
+     * close() calls during cancellation or abort/cleanup.
+     */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private long ossUploadPartSize;
+    private FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator;
+
+    private RefCountedBufferingFileStream fileStream;
+
+    private OSSRecoverableMultipartUpload upload;
+    private long sizeBeforeCurrentPart;
+
+    public OSSRecoverableFsDataOutputStream(
+            long ossUploadPartSize,
+            FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator,
+            OSSRecoverableMultipartUpload upload,
+            long sizeBeforeCurrentPart)
+            throws IOException {
+        this.ossUploadPartSize = ossUploadPartSize;
+        this.cachedFileCreator = cachedFileCreator;
+        this.upload = upload;
+
+        if (upload.getIncompletePart().isPresent()) {
+            this.fileStream =
+                    RefCountedBufferingFileStream.restore(
+                            this.cachedFileCreator, upload.getIncompletePart().get());
+        } else {
+            this.fileStream = RefCountedBufferingFileStream.openNew(this.cachedFileCreator);
+        }
+        this.sizeBeforeCurrentPart = sizeBeforeCurrentPart;
+    }
+
+    @Override
+    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+        lock();
+        try {
+            fileStream.flush();
+
+            switchNewPartFileIfNecessary(ossUploadPartSize);
+
+            return upload.getRecoverable(fileStream);
+        } finally {
+            unlock();
+        }
+    }
+
+    @Override
+    public Committer closeForCommit() throws IOException {
+        lock();
+        try {
+            uploadCurrentPart();
+            return upload.getCommitter();
+        } finally {
+            unlock();
+        }
+    }
+
+    private void uploadCurrentPart() throws IOException {
+        fileStream.flush();
+        fileStream.close();
+        if (fileStream.getPos() > 0L) {
+            upload.uploadPart(fileStream);
+        }
+        fileStream.release();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return sizeBeforeCurrentPart + fileStream.getPos();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        fileStream.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        fileStream.write(b, off, len);
+        switchNewPartFileIfNecessary(ossUploadPartSize);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        fileStream.flush();
+        switchNewPartFileIfNecessary(ossUploadPartSize);
+    }
+
+    @Override
+    public void sync() throws IOException {
+        fileStream.sync();
+    }
+
+    @Override
+    public void close() throws IOException {
+        lock();
+        try {
+            fileStream.flush();
+        } finally {
+            IOUtils.closeQuietly(fileStream);
+            fileStream.release();
+            unlock();
+        }
+    }
+
+    private void lock() throws IOException {
+        try {
+            lock.lockInterruptibly();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("interrupted exception: " + e);
+        }
+    }
+
+    private void unlock() {
+        lock.unlock();
+    }
+
+    private void switchNewPartFileIfNecessary(long partSizeThreshold) throws IOException {
+        final long length = fileStream.getPos();
+        if (length >= partSizeThreshold) {
+            lock();
+            try {
+                sizeBeforeCurrentPart += fileStream.getPos();
+
+                uploadCurrentPart();
+
+                fileStream = RefCountedBufferingFileStream.openNew(cachedFileCreator);
+            } finally {
+                unlock();
+            }
+        }
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUpload.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUpload.java
new file mode 100644
index 0000000..16d2fc6
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUpload.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.RefCountedFSOutputStream;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+
+import com.aliyun.oss.model.PartETag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Uploader for OSS multi part upload. */
+public class OSSRecoverableMultipartUpload {
+    private static final Logger LOG = LoggerFactory.getLogger(OSSRecoverableMultipartUpload.class);
+
+    private String objectName;
+    private String uploadId;
+    private List<PartETag> completeParts;
+    private Optional<File> incompletePart;
+    private Executor uploadThreadPool;
+    private OSSAccessor ossAccessor;
+    private Deque<CompletableFuture<PartETag>> uploadsInProgress;
+    private int numberOfRegisteredParts;
+    private long expectedSizeInBytes;
+    private final String namePrefixForTempObjects;
+
+    public OSSRecoverableMultipartUpload(
+            String objectName,
+            Executor uploadThreadPool,
+            OSSAccessor ossAccessor,
+            Optional<File> incompletePart,
+            String uploadId,
+            List<PartETag> completeParts,
+            long expectedSizeInBytes) {
+        this.objectName = objectName;
+        if (completeParts != null) {
+            this.completeParts = completeParts;
+        } else {
+            this.completeParts = new ArrayList<>();
+        }
+
+        this.incompletePart = incompletePart;
+        if (uploadId != null) {
+            this.uploadId = uploadId;
+        } else {
+            this.uploadId = ossAccessor.startMultipartUpload(objectName);
+        }
+        this.uploadThreadPool = uploadThreadPool;
+        this.ossAccessor = ossAccessor;
+        this.uploadsInProgress = new ArrayDeque<>();
+        this.numberOfRegisteredParts = this.completeParts.size();
+        this.expectedSizeInBytes = expectedSizeInBytes;
+        this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
+    }
+
+    public Optional<File> getIncompletePart() {
+        return incompletePart;
+    }
+
+    public void uploadPart(RefCountedFSOutputStream file) throws IOException {
+        checkState(file.isClosed());
+
+        final CompletableFuture<PartETag> future = new CompletableFuture<>();
+        uploadsInProgress.add(future);
+
+        numberOfRegisteredParts += 1;
+        expectedSizeInBytes += file.getPos();
+
+        file.retain();
+        uploadThreadPool.execute(
+                new UploadTask(
+                        ossAccessor, objectName, uploadId, numberOfRegisteredParts, file, future));
+    }
+
+    public OSSRecoverable getRecoverable(RefCountedFSOutputStream file) throws IOException {
+        String incompletePartObjectName = uploadSmallPart(file);
+
+        checkState(numberOfRegisteredParts - completeParts.size() == uploadsInProgress.size());
+
+        while (numberOfRegisteredParts - completeParts.size() > 0) {
+            CompletableFuture<PartETag> next = uploadsInProgress.peekFirst();
+            PartETag nextPart;
+            try {
+                nextPart = next.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while waiting for part uploads to complete");
+            } catch (ExecutionException e) {
+                throw new IOException("Uploading parts failed ", e.getCause());
+            }
+
+            completeParts.add(nextPart);
+            uploadsInProgress.removeFirst();
+        }
+
+        if (file == null) {
+            return new OSSRecoverable(
+                    uploadId, objectName, completeParts, null, expectedSizeInBytes, 0);
+        } else {
+            return new OSSRecoverable(
+                    uploadId,
+                    objectName,
+                    completeParts,
+                    incompletePartObjectName,
+                    expectedSizeInBytes,
+                    file.getPos());
+        }
+    }
+
+    private String uploadSmallPart(@Nullable RefCountedFSOutputStream file) throws IOException {
+        if (file == null || file.getPos() == 0L) {
+            return null;
+        }
+
+        String incompletePartObjectName = createIncompletePartObjectName();
+        file.retain();
+
+        try {
+            ossAccessor.putObject(incompletePartObjectName, file.getInputFile());
+        } finally {
+            file.release();
+        }
+        return incompletePartObjectName;
+    }
+
+    private String createIncompletePartObjectName() {
+        return namePrefixForTempObjects + UUID.randomUUID().toString();
+    }
+
+    @VisibleForTesting
+    static String createIncompletePartObjectNamePrefix(String objectName) {
+        checkNotNull(objectName);
+
+        final int lastSlash = objectName.lastIndexOf('/');
+        final String parent;
+        final String child;
+
+        if (lastSlash == -1) {
+            parent = "";
+            child = objectName;
+        } else {
+            parent = objectName.substring(0, lastSlash + 1);
+            child = objectName.substring(lastSlash + 1);
+        }
+        return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_";
+    }
+
+    public OSSCommitter getCommitter() throws IOException {
+        final OSSRecoverable recoverable = getRecoverable(null);
+        return new OSSCommitter(
+                ossAccessor,
+                recoverable.getObjectName(),
+                recoverable.getUploadId(),
+                recoverable.getPartETags(),
+                recoverable.getNumBytesInParts());
+    }
+
+    private static class UploadTask implements Runnable {
+        private final OSSAccessor ossAccessor;
+
+        private final String objectName;
+
+        private final String uploadId;
+
+        private final int partNumber;
+
+        private final RefCountedFSOutputStream file;
+
+        private final CompletableFuture<PartETag> future;
+
+        UploadTask(
+                OSSAccessor ossAccessor,
+                String objectName,
+                String uploadId,
+                int partNumber,
+                RefCountedFSOutputStream file,
+                CompletableFuture<PartETag> future) {
+            this.ossAccessor = ossAccessor;
+            this.objectName = objectName;
+            this.uploadId = uploadId;
+
+            checkArgument(partNumber >= 1 && partNumber <= 10_000);
+            this.partNumber = partNumber;
+
+            this.file = file;
+            this.future = future;
+        }
+
+        @Override
+        public void run() {
+            try {
+                PartETag partETag =
+                        ossAccessor.uploadPart(
+                                file.getInputFile(), objectName, uploadId, partNumber);
+                future.complete(partETag);
+                file.release();
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        }
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializer.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializer.java
new file mode 100644
index 0000000..d92b60d
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import com.aliyun.oss.model.PartETag;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+/** Serializer implementation for a {@link OSSRecoverable}. */
+@Internal
+public class OSSRecoverableSerializer implements SimpleVersionedSerializer<OSSRecoverable> {
+    static final OSSRecoverableSerializer INSTANCE = new OSSRecoverableSerializer();
+
+    private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+    private static final int MAGIC_NUMBER = 0x98761234;
+
+    private OSSRecoverableSerializer() {}
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(OSSRecoverable ossRecoverable) throws IOException {
+        final byte[] objectBytes = ossRecoverable.getObjectName().getBytes(CHARSET);
+        final byte[] uploadIdBytes = ossRecoverable.getUploadId().getBytes(CHARSET);
+
+        final byte[][] etags = new byte[ossRecoverable.getPartETags().size()][];
+
+        int partEtagBytes = 0;
+        for (int i = 0; i < ossRecoverable.getPartETags().size(); i++) {
+            etags[i] = ossRecoverable.getPartETags().get(i).getETag().getBytes(CHARSET);
+            partEtagBytes += etags[i].length + 2 * Integer.BYTES;
+        }
+
+        final String lastObjectKey = ossRecoverable.getLastPartObject();
+        final byte[] lastPartBytes = lastObjectKey == null ? null : lastObjectKey.getBytes(CHARSET);
+
+        /**
+         * magic number object name length + object name bytes upload id length + upload id bytes
+         * etags length + (part number + etag length + etag bytes)? number bytes of parts last part
+         * bytes length + last part bytes last part object length
+         */
+        final byte[] resultBytes =
+                new byte
+                        [Integer.BYTES
+                                + Integer.BYTES
+                                + objectBytes.length
+                                + Integer.BYTES
+                                + uploadIdBytes.length
+                                + Integer.BYTES
+                                + partEtagBytes
+                                + Long.BYTES
+                                + Integer.BYTES
+                                + (lastPartBytes == null ? 0 : lastPartBytes.length)
+                                + Long.BYTES];
+
+        ByteBuffer byteBuffer = ByteBuffer.wrap(resultBytes).order(ByteOrder.LITTLE_ENDIAN);
+
+        byteBuffer.putInt(MAGIC_NUMBER);
+
+        byteBuffer.putInt(objectBytes.length);
+        byteBuffer.put(objectBytes);
+
+        byteBuffer.putInt(uploadIdBytes.length);
+        byteBuffer.put(uploadIdBytes);
+
+        byteBuffer.putInt(etags.length);
+        for (int i = 0; i < ossRecoverable.getPartETags().size(); i++) {
+            PartETag pe = ossRecoverable.getPartETags().get(i);
+            byteBuffer.putInt(pe.getPartNumber());
+            byteBuffer.putInt(etags[i].length);
+            byteBuffer.put(etags[i]);
+        }
+
+        byteBuffer.putLong(ossRecoverable.getNumBytesInParts());
+
+        if (lastPartBytes == null) {
+            byteBuffer.putInt(0);
+        } else {
+            byteBuffer.putInt(lastPartBytes.length);
+            byteBuffer.put(lastPartBytes);
+        }
+
+        byteBuffer.putLong(ossRecoverable.getLastPartObjectLength());
+
+        return resultBytes;
+    }
+
+    @Override
+    public OSSRecoverable deserialize(int version, byte[] serialized) throws IOException {
+        switch (version) {
+            case 1:
+                return deserializeV1(serialized);
+            default:
+                throw new IOException("Unrecognized version or corrupt state: " + version);
+        }
+    }
+
+    private OSSRecoverable deserializeV1(byte[] serialized) throws IOException {
+        final ByteBuffer byteBuffer = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+        if (byteBuffer.getInt() != MAGIC_NUMBER) {
+            throw new IOException("Corrupt data: Unexpected magic number " + byteBuffer.getInt());
+        }
+
+        final byte[] objectBytes = new byte[byteBuffer.getInt()];
+        byteBuffer.get(objectBytes);
+
+        final byte[] uploadIdBytes = new byte[byteBuffer.getInt()];
+        byteBuffer.get(uploadIdBytes);
+
+        final int numParts = byteBuffer.getInt();
+        final ArrayList<PartETag> parts = new ArrayList<>(numParts);
+        for (int i = 0; i < numParts; i++) {
+            final int partNum = byteBuffer.getInt();
+            final byte[] buffer = new byte[byteBuffer.getInt()];
+            byteBuffer.get(buffer);
+            parts.add(new PartETag(partNum, new String(buffer, CHARSET)));
+        }
+
+        final long numBytes = byteBuffer.getLong();
+
+        final String lastPart;
+        final int lastObjectArraySize = byteBuffer.getInt();
+        if (lastObjectArraySize == 0) {
+            lastPart = null;
+        } else {
+            byte[] lastPartBytes = new byte[lastObjectArraySize];
+            byteBuffer.get(lastPartBytes);
+            lastPart = new String(lastPartBytes, CHARSET);
+        }
+
+        final long lastPartLength = byteBuffer.getLong();
+
+        return new OSSRecoverable(
+                new String(uploadIdBytes, CHARSET),
+                new String(objectBytes, CHARSET),
+                parts,
+                lastPart,
+                numBytes,
+                lastPartLength);
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableWriter.java b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableWriter.java
new file mode 100644
index 0000000..266d259
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/main/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.core.fs.BackPressuringExecutor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+
+/**
+ * An implementation of the {@link RecoverableWriter} against OSS.
+ *
+ * <p>This implementation makes heavy use of MultiPart Uploads in OSS to persist intermediate data
+ * as soon as possible.
+ */
+public class OSSRecoverableWriter implements RecoverableWriter {
+
+    private OSSAccessor ossAccessor;
+    private long ossUploadPartSize;
+    private int streamConcurrentUploads;
+    private Executor executor;
+
+    private final FunctionWithException<File, RefCountedFileWithStream, IOException>
+            cachedFileCreator;
+
+    public OSSRecoverableWriter(
+            OSSAccessor ossAccessor,
+            long ossUploadPartSize,
+            int streamConcurrentUploads,
+            Executor executor,
+            FunctionWithException<File, RefCountedFileWithStream, IOException> cachedFileCreator) {
+        this.ossAccessor = ossAccessor;
+        this.ossUploadPartSize = ossUploadPartSize;
+        this.streamConcurrentUploads = streamConcurrentUploads;
+        this.executor = executor;
+        this.cachedFileCreator = cachedFileCreator;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream open(Path path) throws IOException {
+        return new OSSRecoverableFsDataOutputStream(
+                ossUploadPartSize,
+                cachedFileCreator,
+                new OSSRecoverableMultipartUpload(
+                        ossAccessor.pathToObject(path),
+                        getExecutor(),
+                        ossAccessor,
+                        Optional.empty(),
+                        null,
+                        null,
+                        0),
+                0L);
+    }
+
+    @Override
+    public OSSRecoverableFsDataOutputStream recover(ResumeRecoverable resumable)
+            throws IOException {
+        final OSSRecoverable recoverable = (OSSRecoverable) resumable;
+        OSSRecoverableMultipartUpload upload =
+                new OSSRecoverableMultipartUpload(
+                        recoverable.getObjectName(),
+                        getExecutor(),
+                        ossAccessor,
+                        recoverInProgressPart(recoverable),
+                        recoverable.getUploadId(),
+                        recoverable.getPartETags(),
+                        recoverable.getNumBytesInParts());
+        return new OSSRecoverableFsDataOutputStream(
+                ossUploadPartSize, cachedFileCreator, upload, recoverable.getNumBytesInParts());
+    }
+
+    private Optional<File> recoverInProgressPart(OSSRecoverable recoverable) throws IOException {
+        String objectKey = recoverable.getLastPartObject();
+        if (objectKey == null) {
+            return Optional.empty();
+        }
+
+        final RefCountedFileWithStream refCountedFile = cachedFileCreator.apply(null);
+        final File file = refCountedFile.getFile();
+
+        ossAccessor.getObject(
+                objectKey, file.getAbsolutePath(), recoverable.getLastPartObjectLength());
+
+        return Optional.of(file);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return true;
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+        final OSSRecoverable ossRecoverable = (OSSRecoverable) resumable;
+        final String smallPartObjectToDelete = ossRecoverable.getLastPartObject();
+        return smallPartObjectToDelete != null && ossAccessor.deleteObject(smallPartObjectToDelete);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable)
+            throws IOException {
+        final OSSRecoverableFsDataOutputStream recovered = recover((OSSRecoverable) resumable);
+        return recovered.closeForCommit();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+        return (SimpleVersionedSerializer) OSSRecoverableSerializer.INSTANCE;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+        return (SimpleVersionedSerializer) OSSRecoverableSerializer.INSTANCE;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }
+
+    private Executor getExecutor() {
+        if (streamConcurrentUploads <= 0) {
+            return executor;
+        }
+        return new BackPressuringExecutor(executor, streamConcurrentUploads);
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterExceptionITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterExceptionITCase.java
new file mode 100644
index 0000000..ed4964b
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterExceptionITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.AbstractHadoopRecoverableWriterExceptionITCase;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.fs.osshadoop.OSSFileSystemFactory.MAX_CONCURRENT_UPLOADS;
+
+/**
+ * Tests for exception throwing in the {@link
+ * org.apache.flink.fs.osshadoop.writer.OSSRecoverableWriter OSSRecoverableWriter}.
+ */
+public class HadoopOSSRecoverableWriterExceptionITCase
+        extends AbstractHadoopRecoverableWriterExceptionITCase {
+
+    // ----------------------- OSS general configuration -----------------------
+
+    private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+    @BeforeClass
+    public static void checkCredentialsAndSetup() throws IOException {
+        // check whether credentials exist
+        OSSTestCredentials.assumeCredentialsAvailable();
+
+        basePath = new Path(OSSTestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
+
+        // initialize configuration with valid credentials
+        final Configuration conf = new Configuration();
+        conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+        conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+        conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+
+        conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+        final String defaultTmpDir = TEMP_FOLDER.getRoot().getAbsolutePath() + "/oss_tmp_dir";
+        conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+        FileSystem.initialize(conf);
+
+        skipped = false;
+    }
+
+    @Override
+    protected String getLocalTmpDir() throws Exception {
+        return ((FlinkOSSFileSystem) getFileSystem()).getLocalTmpDir();
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterITCase.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterITCase.java
new file mode 100644
index 0000000..715f7ce
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/HadoopOSSRecoverableWriterITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.osshadoop.writer.OSSRecoverable;
+import org.apache.flink.runtime.fs.hdfs.AbstractHadoopRecoverableWriterITCase;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.flink.fs.osshadoop.OSSFileSystemFactory.MAX_CONCURRENT_UPLOADS;
+import static org.apache.flink.fs.osshadoop.OSSFileSystemFactory.PART_UPLOAD_MIN_SIZE;
+
+/**
+ * Tests for the {@link org.apache.flink.fs.osshadoop.writer.OSSRecoverableWriter
+ * OSSRecoverableWriter}.
+ */
+public class HadoopOSSRecoverableWriterITCase extends AbstractHadoopRecoverableWriterITCase {
+
+    // ----------------------- OSS general configuration -----------------------
+
+    private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
+
+    @BeforeClass
+    public static void checkCredentialsAndSetup() throws IOException {
+        // check whether credentials exist
+        OSSTestCredentials.assumeCredentialsAvailable();
+
+        basePath = new Path(OSSTestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
+
+        // initialize configuration with valid credentials
+        final Configuration conf = new Configuration();
+        conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+        conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+        conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+
+        conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
+
+        final String defaultTmpDir = TEMP_FOLDER.getRoot().getAbsolutePath() + "/oss_tmp_dir";
+        conf.setString(CoreOptions.TMP_DIRS, defaultTmpDir);
+
+        FileSystem.initialize(conf);
+        bigDataChunk =
+                createBigDataChunk(BIG_CHUNK_DATA_PATTERN, PART_UPLOAD_MIN_SIZE.defaultValue());
+        skipped = false;
+    }
+
+    @Override
+    protected String getLocalTmpDir() throws Exception {
+        return ((FlinkOSSFileSystem) getFileSystem()).getLocalTmpDir();
+    }
+
+    @Override
+    protected String getIncompleteObjectName(RecoverableWriter.ResumeRecoverable recoverable) {
+        return ((OSSRecoverable) recoverable).getLastPartObject();
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/OSSTestUtils.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/OSSTestUtils.java
new file mode 100644
index 0000000..47330cf
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/OSSTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.core.fs.RefCountedFileWithStream;
+import org.apache.flink.fs.osshadoop.writer.OSSRecoverableMultipartUpload;
+
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SplittableRandom;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertEquals;
+
+/** OSS test utility class. */
+public class OSSTestUtils {
+    private static final int BUFFER_SIZE = 10;
+
+    public static void objectContentEquals(
+            FileSystem fs, Path objectPath, List<byte[]> expectContents) throws IOException {
+        String actualContent;
+        try (FSDataInputStream in = fs.open(objectPath);
+                ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            byte[] buffer = new byte[4096];
+            int bytes = in.read(buffer);
+            while (bytes != -1) {
+                out.write(buffer, 0, bytes);
+                bytes = in.read(buffer);
+            }
+            actualContent = out.toString(StandardCharsets.UTF_8.name());
+        }
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        for (byte[] bytes : expectContents) {
+            out.write(bytes);
+        }
+        assertEquals(out.toString(), actualContent);
+    }
+
+    public static void objectContentEquals(FileSystem fs, Path objectPath, byte[]... expectContents)
+            throws IOException {
+        objectContentEquals(fs, objectPath, Arrays.asList(expectContents));
+    }
+
+    public static byte[] bytesOf(String str, long requiredSize) {
+        StringBuilder sb = new StringBuilder();
+        while (sb.length() < requiredSize) {
+            sb.append(str);
+        }
+        return sb.toString().getBytes(StandardCharsets.UTF_8);
+    }
+
+    public static void uploadPart(
+            OSSRecoverableMultipartUpload uploader,
+            final TemporaryFolder temporaryFolder,
+            final byte[] content)
+            throws IOException {
+        RefCountedBufferingFileStream partFile = writeData(temporaryFolder, content);
+
+        partFile.close();
+
+        uploader.uploadPart(partFile);
+    }
+
+    public static RefCountedBufferingFileStream writeData(
+            TemporaryFolder temporaryFolder, byte[] content) throws IOException {
+        final File newFile = new File(temporaryFolder.getRoot(), ".tmp_" + UUID.randomUUID());
+        final OutputStream out =
+                Files.newOutputStream(newFile.toPath(), StandardOpenOption.CREATE_NEW);
+
+        final RefCountedBufferingFileStream testStream =
+                new RefCountedBufferingFileStream(
+                        RefCountedFileWithStream.newFile(newFile, out), BUFFER_SIZE);
+
+        testStream.write(content, 0, content.length);
+        return testStream;
+    }
+
+    public static List<byte[]> generateRandomBuffer(long size, int partSize) {
+        List<byte[]> buffers = new ArrayList<>();
+
+        final SplittableRandom random = new SplittableRandom();
+
+        long totalSize = 0L;
+
+        while (totalSize < size) {
+            int bufferSize = random.nextInt(0, 2 * partSize);
+            byte[] buffer = new byte[bufferSize];
+            for (int i = 0; i < bufferSize; ++i) {
+                buffer[i] = (byte) (random.nextInt() & 0xFF);
+            }
+
+            buffers.add(buffer);
+            totalSize += bufferSize;
+        }
+
+        return buffers;
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStreamTest.java
new file mode 100644
index 0000000..d452a5f
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableFsDataOutputStreamTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.osshadoop.OSSTestUtils;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static junit.framework.TestCase.assertFalse;
+
+/** Tests for the {@link OSSRecoverableFsDataOutputStream}. */
+public class OSSRecoverableFsDataOutputStreamTest {
+
+    private static Path basePath;
+
+    private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+    private FileSystem fs;
+
+    private static final String TEST_OBJECT_NAME_PREFIX = "TEST-OBJECT-";
+
+    private Path objectPath;
+
+    private RecoverableWriter writer;
+
+    private RecoverableFsDataOutputStream fsDataOutputStream;
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void before() throws IOException {
+        OSSTestCredentials.assumeCredentialsAvailable();
+
+        final Configuration conf = new Configuration();
+        conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+        conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+        conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+        FileSystem.initialize(conf);
+
+        basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
+        fs = basePath.getFileSystem();
+        writer = fs.createRecoverableWriter();
+
+        objectPath = new Path(basePath + "/" + TEST_OBJECT_NAME_PREFIX + UUID.randomUUID());
+
+        fsDataOutputStream = writer.open(objectPath);
+    }
+
+    @Test
+    public void testRegularDataWritten() throws IOException {
+        final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+        fsDataOutputStream.write(part);
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, part);
+    }
+
+    @Test
+    public void testNoDataWritten() throws IOException {
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        // will not create empty object
+        assertFalse(fs.exists(objectPath));
+    }
+
+    @Test(expected = IOException.class)
+    public void testCloseForCommitOnClosedStreamShouldFail() throws IOException {
+        fsDataOutputStream.closeForCommit().commit();
+        fsDataOutputStream.closeForCommit().commit();
+    }
+
+    @Test
+    public void testCloseWithoutCommit() throws IOException {
+        final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+        fsDataOutputStream.write(part);
+
+        fsDataOutputStream.close();
+
+        // close without commit will not upload current part
+        assertFalse(fs.exists(objectPath));
+    }
+
+    @Test
+    public void testWriteLargeFile() throws IOException {
+        List<byte[]> buffers = OSSTestUtils.generateRandomBuffer(50 * 1024 * 1024, 10 * 104 * 1024);
+        for (byte[] buffer : buffers) {
+            fsDataOutputStream.write(buffer);
+        }
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, buffers);
+    }
+
+    @Test
+    public void testConcatWrites() throws IOException {
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("hello", 5));
+        fsDataOutputStream.write(OSSTestUtils.bytesOf(" ", 1));
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("world", 5));
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, OSSTestUtils.bytesOf("hello world", 11));
+    }
+
+    @Test
+    public void testRegularRecovery() throws IOException {
+        final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+        fsDataOutputStream.write(part);
+
+        RecoverableWriter.ResumeRecoverable recoverable = fsDataOutputStream.persist();
+
+        fsDataOutputStream = writer.recover(recoverable);
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, part);
+    }
+
+    @Test
+    public void testContinuousPersistWithoutWrites() throws IOException {
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("hello", 5));
+
+        fsDataOutputStream.persist();
+        fsDataOutputStream.persist();
+        fsDataOutputStream.persist();
+        fsDataOutputStream.persist();
+
+        fsDataOutputStream.write(OSSTestUtils.bytesOf(" ", 1));
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("world", 5));
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, OSSTestUtils.bytesOf("hello world", 11));
+    }
+
+    @Test
+    public void testWriteSmallDataAndPersist() throws IOException {
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("h", 1));
+        fsDataOutputStream.persist();
+
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("e", 1));
+        fsDataOutputStream.persist();
+
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("l", 1));
+        fsDataOutputStream.persist();
+
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("l", 1));
+        fsDataOutputStream.persist();
+
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("o", 1));
+        fsDataOutputStream.persist();
+
+        fsDataOutputStream.write(OSSTestUtils.bytesOf(" ", 1));
+        fsDataOutputStream.write(OSSTestUtils.bytesOf("world", 5));
+        fsDataOutputStream.persist();
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, OSSTestUtils.bytesOf("hello world", 11));
+    }
+
+    @Test
+    public void testWriteBigDataAndPersist() throws IOException {
+        List<byte[]> buffers = OSSTestUtils.generateRandomBuffer(50 * 1024 * 1024, 10 * 104 * 1024);
+        for (byte[] buffer : buffers) {
+            fsDataOutputStream.write(buffer);
+            fsDataOutputStream.persist();
+        }
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, buffers);
+    }
+
+    @Test
+    public void testDataWrittenAfterRecovery() throws IOException {
+        final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+        fsDataOutputStream.write(part);
+
+        RecoverableWriter.ResumeRecoverable recoverable = fsDataOutputStream.persist();
+
+        fsDataOutputStream = writer.recover(recoverable);
+
+        List<byte[]> buffers = OSSTestUtils.generateRandomBuffer(50 * 1024 * 1024, 10 * 104 * 1024);
+        for (byte[] buffer : buffers) {
+            fsDataOutputStream.write(buffer);
+        }
+
+        RecoverableFsDataOutputStream.Committer committer = fsDataOutputStream.closeForCommit();
+        committer.commit();
+
+        buffers.add(0, part);
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, buffers);
+    }
+
+    @After
+    public void after() throws IOException {
+        try {
+            if (fs != null) {
+                fs.delete(basePath, true);
+            }
+        } finally {
+            FileSystem.initialize(new Configuration());
+        }
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUploadTest.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUploadTest.java
new file mode 100644
index 0000000..c435d72
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableMultipartUploadTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RefCountedBufferingFileStream;
+import org.apache.flink.fs.osshadoop.OSSAccessor;
+import org.apache.flink.fs.osshadoop.OSSTestUtils;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.testutils.oss.OSSTestCredentials;
+
+import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertNotNull;
+
+/** Tests for the {@link OSSRecoverableMultipartUpload}. */
+public class OSSRecoverableMultipartUploadTest {
+
+    private static Path basePath;
+
+    private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
+
+    private FileSystem fs;
+
+    private static final String TEST_OBJECT_NAME_PREFIX = "TEST-OBJECT-";
+
+    private Path objectPath;
+
+    private String uploadId;
+
+    private List<PartETag> completeParts;
+
+    private OSSAccessor ossAccessor;
+
+    private OSSRecoverableMultipartUpload uploader;
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void before() throws IOException {
+        OSSTestCredentials.assumeCredentialsAvailable();
+
+        final Configuration conf = new Configuration();
+        conf.setString("fs.oss.endpoint", OSSTestCredentials.getOSSEndpoint());
+        conf.setString("fs.oss.accessKeyId", OSSTestCredentials.getOSSAccessKey());
+        conf.setString("fs.oss.accessKeySecret", OSSTestCredentials.getOSSSecretKey());
+        FileSystem.initialize(conf);
+
+        basePath = new Path(OSSTestCredentials.getTestBucketUri() + TEST_DATA_DIR);
+        fs = basePath.getFileSystem();
+
+        objectPath = new Path(basePath + "/" + TEST_OBJECT_NAME_PREFIX + UUID.randomUUID());
+
+        ossAccessor =
+                new OSSAccessor(
+                        (AliyunOSSFileSystem) ((HadoopFileSystem) fs).getHadoopFileSystem());
+
+        uploadId = ossAccessor.startMultipartUpload(ossAccessor.pathToObject(objectPath));
+
+        completeParts = new ArrayList<>();
+
+        uploader =
+                new OSSRecoverableMultipartUpload(
+                        ossAccessor.pathToObject(objectPath),
+                        Executors.newCachedThreadPool(),
+                        ossAccessor,
+                        null,
+                        uploadId,
+                        completeParts,
+                        0L);
+    }
+
+    @Test
+    public void testUploadSinglePart() throws IOException {
+        final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+        OSSTestUtils.uploadPart(uploader, temporaryFolder, part);
+
+        uploader.getRecoverable(null);
+
+        ossAccessor.completeMultipartUpload(
+                ossAccessor.pathToObject(objectPath), uploadId, completeParts);
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, part);
+    }
+
+    @Test
+    public void testUploadIncompletePart() throws IOException {
+        final byte[] part = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+
+        RefCountedBufferingFileStream partFile = OSSTestUtils.writeData(temporaryFolder, part);
+
+        partFile.close();
+
+        OSSRecoverable ossRecoverable = uploader.getRecoverable(partFile);
+
+        OSSTestUtils.objectContentEquals(
+                fs, ossAccessor.objectToPath(ossRecoverable.getLastPartObject()), part);
+    }
+
+    @Test
+    public void testMultipartAndIncompletePart() throws IOException {
+        final byte[] firstCompletePart = OSSTestUtils.bytesOf("hello world", 1024 * 1024);
+        final byte[] secondCompletePart = OSSTestUtils.bytesOf("hello again", 1024 * 1024);
+        final byte[] thirdIncompletePart = OSSTestUtils.bytesOf("!!!", 1024);
+
+        OSSTestUtils.uploadPart(uploader, temporaryFolder, firstCompletePart);
+        OSSTestUtils.uploadPart(uploader, temporaryFolder, secondCompletePart);
+
+        RefCountedBufferingFileStream partFile =
+                OSSTestUtils.writeData(temporaryFolder, thirdIncompletePart);
+
+        partFile.close();
+
+        OSSRecoverable ossRecoverable = uploader.getRecoverable(partFile);
+
+        assertEquals(ossRecoverable.getPartETags().size(), 2);
+        assertNotNull(ossRecoverable.getLastPartObject());
+        assertEquals(ossRecoverable.getLastPartObjectLength(), 1026);
+        assertEquals(ossRecoverable.getNumBytesInParts(), 2 * 1024 * 1024 + 20);
+        assertEquals(ossRecoverable.getUploadId(), uploadId);
+        assertEquals(ossRecoverable.getObjectName(), ossAccessor.pathToObject(objectPath));
+
+        ossAccessor.completeMultipartUpload(
+                ossAccessor.pathToObject(objectPath), uploadId, completeParts);
+
+        OSSTestUtils.objectContentEquals(fs, objectPath, firstCompletePart, secondCompletePart);
+
+        OSSTestUtils.objectContentEquals(
+                fs,
+                ossAccessor.objectToPath(ossRecoverable.getLastPartObject()),
+                thirdIncompletePart);
+    }
+
+    @Test
+    public void testRecoverableReflectsTheLatestPartialObject() throws IOException {
+        final byte[] incompletePartOne = OSSTestUtils.bytesOf("AB", 1024);
+        final byte[] incompletePartTwo = OSSTestUtils.bytesOf("ABC", 1024);
+
+        RefCountedBufferingFileStream partFile =
+                OSSTestUtils.writeData(temporaryFolder, incompletePartOne);
+        partFile.close();
+
+        OSSRecoverable recoverableOne = uploader.getRecoverable(partFile);
+
+        partFile = OSSTestUtils.writeData(temporaryFolder, incompletePartTwo);
+        partFile.close();
+        OSSRecoverable recoverableTwo = uploader.getRecoverable(partFile);
+
+        assertFalse(recoverableOne.getLastPartObject().equals(recoverableTwo.getLastPartObject()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testUploadingNonClosedFileAsCompleteShouldThroughException() throws IOException {
+        final byte[] incompletePart = OSSTestUtils.bytesOf("!!!", 1024);
+
+        RefCountedBufferingFileStream partFile =
+                OSSTestUtils.writeData(temporaryFolder, incompletePart);
+
+        uploader.uploadPart(partFile);
+    }
+
+    @After
+    public void after() throws IOException {
+        try {
+            if (fs != null) {
+                fs.delete(basePath, true);
+            }
+        } finally {
+            FileSystem.initialize(new Configuration());
+        }
+    }
+}
diff --git a/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializerTest.java b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializerTest.java
new file mode 100644
index 0000000..b3a71fc
--- /dev/null
+++ b/flink-filesystems/flink-oss-fs-hadoop/src/test/java/org/apache/flink/fs/osshadoop/writer/OSSRecoverableSerializerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.osshadoop.writer;
+
+import com.aliyun.oss.model.PartETag;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Tests for the {@link OSSRecoverableSerializer}. */
+public class OSSRecoverableSerializerTest {
+
+    private final OSSRecoverableSerializer serializer = OSSRecoverableSerializer.INSTANCE;
+
+    private static final String TEST_OBJECT_NAME = "TEST-OBJECT";
+
+    private static final String TEST_UPLOAD_ID = "TEST-UPLOAD-ID";
+
+    private static final String INCOMPLETE_OBJECT_NAME = "TEST-INCOMPLETE-PART";
+
+    private static final String ETAG_PREFIX = "TEST-ETAG-";
+
+    @Test
+    public void testSerializeEmptyOSSRecoverable() throws IOException {
+        OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(false);
+
+        byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+        OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+        assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+    }
+
+    @Test
+    public void testSerializeOSSRecoverableOnlyWithIncompleteObject() throws IOException {
+        OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(true);
+
+        byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+        OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+        assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+    }
+
+    @Test
+    public void testSerializeOSSRecoverableWithIncompleteObject() throws IOException {
+        OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(true, 2, 4, 6);
+
+        byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+        OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+        assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+    }
+
+    @Test
+    public void testSerializeOSSRecoverableWithoutIncompleteObject() throws IOException {
+        OSSRecoverable originalEmptyRecoverable = createOSSRecoverable(false, 2, 4, 6);
+
+        byte[] serializedRecoverable = serializer.serialize(originalEmptyRecoverable);
+        OSSRecoverable copiedEmptyRecoverable = serializer.deserialize(1, serializedRecoverable);
+
+        assertThat(originalEmptyRecoverable, isEqualTo(copiedEmptyRecoverable));
+    }
+
+    // --------------------------------- Matchers ---------------------------------
+
+    private static TypeSafeMatcher<OSSRecoverable> isEqualTo(OSSRecoverable expectedRecoverable) {
+        return new TypeSafeMatcher<OSSRecoverable>() {
+
+            @Override
+            protected boolean matchesSafely(OSSRecoverable actualRecoverable) {
+
+                return Objects.equals(
+                                expectedRecoverable.getObjectName(),
+                                actualRecoverable.getObjectName())
+                        && Objects.equals(
+                                expectedRecoverable.getUploadId(), actualRecoverable.getUploadId())
+                        && expectedRecoverable.getNumBytesInParts()
+                                == actualRecoverable.getNumBytesInParts()
+                        && Objects.equals(
+                                expectedRecoverable.getLastPartObject(),
+                                actualRecoverable.getLastPartObject())
+                        && expectedRecoverable.getLastPartObjectLength()
+                                == actualRecoverable.getLastPartObjectLength()
+                        && compareLists(
+                                expectedRecoverable.getPartETags(),
+                                actualRecoverable.getPartETags());
+            }
+
+            private boolean compareLists(final List<PartETag> first, final List<PartETag> second) {
+                return Arrays.equals(
+                        first.stream().map(PartETag::getETag).toArray(),
+                        second.stream().map(PartETag::getETag).toArray());
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendText(
+                        expectedRecoverable + " with ignored LAST_PART_OBJECT_NAME.");
+            }
+        };
+    }
+
+    // --------------------------------- Test Utils ---------------------------------
+
+    private static OSSRecoverable createOSSRecoverable(
+            boolean withIncompletePart, int... partNumbers) {
+        List<PartETag> etags = new ArrayList<>();
+        for (int i : partNumbers) {
+            etags.add(createEtag(i));
+        }
+
+        if (withIncompletePart) {
+            return new OSSRecoverable(
+                    TEST_UPLOAD_ID,
+                    TEST_OBJECT_NAME,
+                    etags,
+                    INCOMPLETE_OBJECT_NAME,
+                    12345L,
+                    54321L);
+        } else {
+            return new OSSRecoverable(TEST_UPLOAD_ID, TEST_OBJECT_NAME, etags, null, 12345L, 0L);
+        }
+    }
+
+    private static PartETag createEtag(int partNumber) {
+        return new PartETag(partNumber, ETAG_PREFIX + partNumber);
+    }
+}