You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/14 08:53:59 UTC

[GitHub] [flink] ramkrish86 opened a new pull request, #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

ramkrish86 opened a new pull request, #21508:
URL: https://github.com/apache/flink/pull/21508

   
   ## What is the purpose of the change
   
   Supports ABFS as Streaming file sink
   
   
   ## Brief change log
   
   - Allows sink to write to ABFS
   - Allows to recover from ABFS
   - There is no truncate API in ABFS hadoop driver. But for cases where we want to recover a job from an older persisted offset, we need to truncate. For which we rewrite the file.
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
     - Verified the basics in a Kubernetes based session cluster with 1 JM and 4 TMs. 
     - Was able to write to ABFS and also recover from it. 
     - **Pls note this is an initial PR, we are still doing some extensive testing. This is for initial feedback**
     -  Used the existing test cases in **HadoopRecoverableWriterTest** and extended the same to verify the functionality but with ABFS.
     - New test case name AzureBlobRecoverableWriterTest
     - Existing Hadoop serializers are used. Nothing changed in this format.
     - `[INFO]  T E S T S
   [INFO] -------------------------------------------------------
   [INFO] Running org.apache.flink.fs.azurefs.AzureBlobRecoverableWriterTest
   [INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 72.345 s - in org.apache.flink.fs.azurefs.AzureBlobRecoverableWriterTest
   [INFO] 
   [INFO] Results:
   [INFO] 
   [INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (No)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes) - Recovery is also handled here.
     - The S3 file system connector: (No)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1350669654

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4b110c5082147bc6d23c5050a158aa83c5f6bc88",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b110c5082147bc6d23c5050a158aa83c5f6bc88",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b110c5082147bc6d23c5050a158aa83c5f6bc88 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1052089932


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);
+                remaining -= buf.length;
+                LOG.info("Bytes remaining to read {}", remaining);
+                fsDataOutputStream.write(buf);
+                dataWritten += buf.length;
+                LOG.info("Successfully wrote {} bytes of data {}", dataWritten);
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);
+    }
+
+    private void rename(FileSystem fs, Path renameTempPath) throws IOException {
+        LOG.info("Renaming the temp rename file {} back to tempFile {}", renameTempPath, tempFile);
+        try {
+            boolean result = fs.rename(renameTempPath, tempFile);
+            if (!result) {
+                LOG.error(
+                        "Unable to recover. Rename operation failed {} to {}",
+                        renameTempPath,
+                        tempFile);
+                throw new IOException("Unable to recover. Rename operation failed");
+            } else {
+                LOG.info("Rename was successful");
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Renaming of tempFile did not happen after truncating {} to {}",
+                    renameTempPath,
+                    tempFile);
+            throw e;
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return out.getPos();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.hflush();
+    }
+
+    @Override
+    public void sync() throws IOException {
+        out.hflush();
+        out.hsync();
+    }
+
+    @Override
+    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+        sync();
+        return new HadoopFsRecoverable(targetFile, tempFile, getPos() + initialFileSize);
+    }
+
+    @Override
+    public Committer closeForCommit() throws IOException {
+        final long pos = getPos();
+        close();
+        return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter(
+                fs, new HadoopFsRecoverable(targetFile, tempFile, pos + initialFileSize));
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.close();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Committer
+    // ------------------------------------------------------------------------
+
+    /**
+     * Implementation of a committer for the Hadoop File System abstraction. This implementation
+     * commits by renaming the temp file to the final file path. The temp file is truncated before
+     * renaming in case there is trailing garbage data.
+     */
+    static class ABFSCommitter implements Committer {
+
+        private final FileSystem fs;
+        private final HadoopFsRecoverable recoverable;
+
+        ABFSCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
+            this.fs = checkNotNull(fs);
+            this.recoverable = checkNotNull(recoverable);
+        }
+
+        @Override
+        public void commit() throws IOException {
+            final Path src = recoverable.tempFile();
+            final Path dest = recoverable.targetFile();
+            final long expectedLength = recoverable.offset();
+            FileStatus srcStatus = null;
+            try {
+                srcStatus = fs.getFileStatus(src);
+            } catch (FileNotFoundException fnfe) {
+                // srcStatus will be null
+            } catch (IOException e) {
+                throw new IOException("Cannot clean commit: Staging file does not exist.");
+            }
+            if (srcStatus != null) {
+                LOG.debug(
+                        "The srcStatus is {} and exp length is {}",
+                        srcStatus.getLen(),
+                        expectedLength);
+                if (srcStatus.getLen() != expectedLength) {
+                    LOG.error(
+                            "The src file {} with length {} does not match the expected length {}",
+                            src,
+                            srcStatus.getLen(),
+                            expectedLength);
+                    throw new IOException(
+                            "The src file "
+                                    + src
+                                    + " with length "
+                                    + srcStatus.getLen()
+                                    + " "
+                                    + "does not match the expected length "
+                                    + expectedLength);
+                }
+                try {
+                    fs.rename(src, dest);
+                } catch (IOException e) {
+                    throw new IOException(
+                            "Committing file by rename failed: " + src + " to " + dest, e);
+                }
+            } else if (!fs.exists(dest)) {
+                // neither exists - that can be a sign of
+                //   - (1) a serious problem (file system loss of data)
+                //   - (2) a recovery of a savepoint that is some time old and the users
+                //         removed the files in the meantime.
+
+                // TODO how to handle this?
+                // We probably need an option for users whether this should log,
+                // or result in an exception or unrecoverable exception

Review Comment:
   This I did not handle and left it as it in the HadoopRecoverableDataOS.java. Fine I can throw the exception here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong closed pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong closed pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path
URL: https://github.com/apache/flink/pull/21508


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1357728930

   Pushed the changes. We will update here once the other tests are done. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1352602522

   Sure. I'll try to take a look asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1372051535

   @xintongsong  - We are done with the testing too. Will wait for your final set of comments. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1373152299

   Thanks @xintongsong . Also it would be great if this can be there in 1.17 release. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] carp84 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
carp84 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1352600177

   Thanks for the contribution @ramkrish86 .
   
   @xintongsong could you help review this one since you have experience reviewing similar support for Google storage FS (PR#15599)? Many thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1059862791


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        out = fs.append(tempFile);
+        if (out.getPos() == 0) {
+            // In ABFS when we try to append we don't account for the initial file size like we do
+            // in DFS.
+            // So we explicitly store this and when we do a persist call we make use of it.
+            // This we have raised a bug in ABFS hadoop driver side. Once fixed this will not be
+            // needed. So it should be ok to put this in side the 'if' check.
+            initialFileSize = fs.getFileStatus(tempFile).getLen();
+        }
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            int readBytes = -1;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                readBytes = fsDis.read(buf, 0, buf.length);
+                if (readBytes != -1) {
+                    remaining -= readBytes;
+                    LOG.info("Bytes remaining to read {}", remaining);
+                    fsDataOutputStream.write(buf, 0, readBytes);
+                    dataWritten += readBytes;
+                    LOG.info("Successfully wrote {} bytes of data", dataWritten);
+                } else {
+                    LOG.debug("Reached the end of the file");
+                    remaining = 0;
+                }
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+            if (fsDis != null) {
+                LOG.debug("Closing the input stream");
+                fsDis.close();
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);

Review Comment:
   In HDFS case we try to trucnate and if that is still failing we were throwing exceptions. So I think the same is happening here. This recovery will be a failure. Also as I said truncate happens only in case where we move to an older recovery checkpoint. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1369515475

   >  except for the one about deduplicating HadoopRecoverableFsDataOutputStream and AzureBlobFsRecoverableDataOutputStream
   What specifically you are talking about here? I went through the older comments and I thought I resolved them which ever was discussed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1373364230

   We used the streaming wordcount program as the test case. 
   We generated close to 2G data and collected the word count sample. This result was matched by using output as ABFS file sink and verified the output by running the tests without restarting of any TMs and also by restarting of TMs intermittently to ensure we are able to recover and get back the same word count.  Total TMs - 5. Parallelism - 4. 
   
   We tested with default rolling policy where we are sure the inprogress files will be created and it needs a truncation on recovery from the latest check point. Also tested with checkpoint based rolling policy and in that case no truncation was needed as inprogress files were always committed. 
   We have not extensively tested with ORC/Parquet formats. 
   I can commit for any other further improvements/enhancements/bug fixes that needs to be done here. 
   
   Created the TODO as agreed up on. https://issues.apache.org/jira/browse/FLINK-30588. FYI @xintongsong 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1357577045

   
   > * There're many `TODO`s in `AzureBlobFsRecoverableDataOutputStream`. What's your plan for them?
   
   Mainly 2 TODOs. One is the 2MB buffer and next is the flush support. I think for now it should be good. But I can raise TODO JIRAs and work on it later. Is that ok ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1060557482


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        out = fs.append(tempFile);
+        if (out.getPos() == 0) {
+            // In ABFS when we try to append we don't account for the initial file size like we do
+            // in DFS.
+            // So we explicitly store this and when we do a persist call we make use of it.
+            // This we have raised a bug in ABFS hadoop driver side. Once fixed this will not be
+            // needed. So it should be ok to put this in side the 'if' check.
+            initialFileSize = fs.getFileStatus(tempFile).getLen();
+        }
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            int readBytes = -1;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                readBytes = fsDis.read(buf, 0, buf.length);
+                if (readBytes != -1) {
+                    remaining -= readBytes;
+                    LOG.info("Bytes remaining to read {}", remaining);
+                    fsDataOutputStream.write(buf, 0, readBytes);
+                    dataWritten += readBytes;
+                    LOG.info("Successfully wrote {} bytes of data", dataWritten);
+                } else {
+                    LOG.debug("Reached the end of the file");
+                    remaining = 0;
+                }
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+            if (fsDis != null) {
+                LOG.debug("Closing the input stream");
+                fsDis.close();
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);

Review Comment:
   In our testing with DefaultRollingpolicy - the recovery is not really possible as the inprogress that was marked would have already been committed in the next checkpoint. Only with policy as Checkpoint based rolling policy we can achieve this . So basically when moving to an older checkpoint - I believe  yes if we fail in a delete and rename step it is a non recoverable error but we can always move to latest checkpoint. But am not very sure how this recovery will be used in case of FS based sinks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1060844881


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        out = fs.append(tempFile);
+        if (out.getPos() == 0) {
+            // In ABFS when we try to append we don't account for the initial file size like we do
+            // in DFS.
+            // So we explicitly store this and when we do a persist call we make use of it.
+            // This we have raised a bug in ABFS hadoop driver side. Once fixed this will not be
+            // needed. So it should be ok to put this in side the 'if' check.
+            initialFileSize = fs.getFileStatus(tempFile).getLen();
+        }
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            int readBytes = -1;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                readBytes = fsDis.read(buf, 0, buf.length);
+                if (readBytes != -1) {
+                    remaining -= readBytes;
+                    LOG.info("Bytes remaining to read {}", remaining);
+                    fsDataOutputStream.write(buf, 0, readBytes);
+                    dataWritten += readBytes;
+                    LOG.info("Successfully wrote {} bytes of data", dataWritten);
+                } else {
+                    LOG.debug("Reached the end of the file");
+                    remaining = 0;
+                }
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+            if (fsDis != null) {
+                LOG.debug("Closing the input stream");
+                fsDis.close();
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);

Review Comment:
   Now I have handled this case by checking if the '.rename' file is available and if that is really of the expected length just go with rename. This should solve the problem of where the failure happens between delete and rename. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1375987259

   https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/#important-considerations
   Here we say that we support S3, HDFS and local. Probably we should add ABFS. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1052088521


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);

Review Comment:
   Yes. Will update it. Its a good way to handle EOF. Also am now adding 'close' of the input stream. It might not be a critical one but better to do it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1052900374


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   Got your question. I checked with the storage driver team (hadoop based storage driver), they said currently there is no direct truncate support. So the manual truncation is the option over here. A generic question though @xintongsong - In case of ORC/Parquet format does the file recovery flow take care of the truncate or those formats already are aware of the last committed offset? This is a question out of this ABFS support PR (in general)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1369490921

   Hi @ramkrish86,
   
   How are things going? I briefly checked the PR and it seems most of the comments are addressed, except for the one about deduplicating `HadoopRecoverableFsDataOutputStream` and `AzureBlobFsRecoverableDataOutputStream`. Please let me know if you're ready for another round of review.
   
   BTW, do you need this feature to be included in Flink 1.17? If yes, the feature freeze date for 1.17 is Jan 31st, 4 weeks from now. However, I have a vacation plan and won't be responsible in the last 2 weeks of the 4. So we probably would need to finalize this PR, as well as updating the documentations, in the next 2 weeks. If there's no such a strong demand to see this in 1.17, take your time then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] anoopsjohn commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
anoopsjohn commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1051328889


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);
+                remaining -= buf.length;
+                LOG.info("Bytes remaining to read {}", remaining);
+                fsDataOutputStream.write(buf);
+                dataWritten += buf.length;
+                LOG.info("Successfully wrote {} bytes of data {}", dataWritten);
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);
+    }
+
+    private void rename(FileSystem fs, Path renameTempPath) throws IOException {
+        LOG.info("Renaming the temp rename file {} back to tempFile {}", renameTempPath, tempFile);
+        try {
+            boolean result = fs.rename(renameTempPath, tempFile);
+            if (!result) {
+                LOG.error(
+                        "Unable to recover. Rename operation failed {} to {}",
+                        renameTempPath,
+                        tempFile);
+                throw new IOException("Unable to recover. Rename operation failed");
+            } else {
+                LOG.info("Rename was successful");
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Renaming of tempFile did not happen after truncating {} to {}",
+                    renameTempPath,
+                    tempFile);
+            throw e;
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return out.getPos();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.hflush();
+    }
+
+    @Override
+    public void sync() throws IOException {
+        out.hflush();

Review Comment:
   Minor: Here we handle ABFS OS and you can call hsync alone.  Anyways both APIs doing same in ABFS case.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        out = fs.append(tempFile);
+        if (out.getPos() == 0) {
+            // In ABFS when we try to append we don't account for the initial file size like we do
+            // in DFS.
+            // So we explicitly store this and when we do a persist call we make use of it.
+            // This we have raised a bug in ABFS hadoop driver side. Once fixed this will not be
+            // needed. So it should be ok to put this in side the 'if' check.
+            initialFileSize = fs.getFileStatus(tempFile).getLen();
+        }
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            int readBytes = -1;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                readBytes = fsDis.read(buf, 0, buf.length);
+                if (readBytes != -1) {
+                    remaining -= readBytes;
+                    LOG.info("Bytes remaining to read {}", remaining);
+                    fsDataOutputStream.write(buf, 0, readBytes);
+                    dataWritten += readBytes;
+                    LOG.info("Successfully wrote {} bytes of data", dataWritten);
+                } else {
+                    LOG.debug("Reached the end of the file");
+                    remaining = 0;
+                }
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+            if (fsDis != null) {
+                LOG.debug("Closing the input stream");
+                fsDis.close();
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);

Review Comment:
   What if there is a fail in between these steps of delete and rename?  Later how it will be possible to rollback.  The truncate should be able to handle such case?



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);

Review Comment:
   What if an attempt of recover failed itself in between?  That will be retried again later and so a similar another truncate attempt?  If so, we might have to do a cleanup of *.rename temp file itself. May be its ok as default FileSystem behave is overwrite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1361723105

   The current failure seems to be unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1052089207


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   Can you clarify here as what you mean by local system? The temp file will be created in the same Remote ABFS system only but the content is truncated and finally gets renamed. (that is why there is a delete and then rename). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1358726442

   > Mainly 2 TODOs. One is the 2MB buffer and next is the flush support. I think for now it should be good. But I can raise TODO JIRAs and work on it later. Is that ok ?
   
   I'm fine with the plan. Just trying to understand the plan. I have no strong opinions on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1051963544


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFileSystem.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/** Wraps the hadoop's file system to create AzureBlobFileSystem. */
+@Internal
+class AzureBlobFileSystem extends HadoopFileSystem {
+    /**
+     * Wraps the given Hadoop File System object as a Flink File System object. The given Hadoop
+     * file system object is expected to be initialized already.
+     *
+     * @param hadoopFileSystem The Azure Blob FileSystem that will be used under the hood.
+     */
+    public AzureBlobFileSystem(FileSystem hadoopFileSystem) {

Review Comment:
   Ok sure . will change it. 



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)

Review Comment:
   Sure will change it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1059863005


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);

Review Comment:
   Not sure. May be in testing if this case happens will handle it of removing the *.rename file once. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1060404322


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {

Review Comment:
   > What specifically you are talking about here?
   
   @ramkrish86, I meant this one. I guess it's overlooked because github hides it when there's too many comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1369511575

   @xintongsong  - We have almost completed the testing part also. I will update the PR in a day or two time frame. Am checking with other comments that Anoop had added over here. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1061334641


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {

Review Comment:
   @xintongsong  - Can you pls check if this refactoring is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1052787197


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   I meant the way we write the truncated content to the renamed temp file. We first read the data from the temp file into the memory, then write the data from memory to the renamed temp file. Here the memory is where the Flink file sink operator is executed, while both temp file and renamed temp file are on the remote ABFS. Data transmissions between the memory and the ABFS can be expensive, at least more expensive than data transmissions within ABFS. So I just wonder if there's anyway to copy data directly from the temp file to the renamed temp file, without going through the local memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1362444049

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1376671834

   > @xintongsong - Also want to know about this https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/filesystems/overview/. Now since ABFS support is available in FileSink (after this PR gets merged), shall we have the similar support for table store too?
   
   Checked with @JingsongLi about this. Currently, when using Flink + FTS, all file systems in that flink supports are supported. However, when using other FTS with other engines (Spark / Hive / Trino), further development is required on the FTS side, mostly related to dependency bundling and conflict resolving.
   
   That is to say, with what we currently have, there's no need to update the FTS documentation. And if we want to support FTS + ABFS + Spark/Hive/Trino, we would need to open another PR to the FTS project, which is definitely welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1376626207

   > @xintongsong - Also want to know about this https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/filesystems/overview/. Now since ABFS support is available in FileSink (after this PR gets merged), shall we have the similar support for table store too?
   
   Good question. From the documentation, it seems not all file systems that FilsSink supports are supported by FTS. I'll talk to someone from the FTS team about this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1064353993


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.BaseHadoopFsRecoverableFsDataOutputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream
+        extends BaseHadoopFsRecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+    private static final String RENAME = ".rename";
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;

Review Comment:
   We use `@VisibleForTesting` for fields that have broader visibility then needed for testing purpose.



##########
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/** Base class for ABFS and Hadoop recoverable stream. */
+@Internal
+public abstract class BaseHadoopFsRecoverableFsDataOutputStream
+        extends RecoverableFsDataOutputStream {
+
+    protected FileSystem fs;
+
+    protected Path targetFile;
+
+    protected Path tempFile;
+
+    protected FSDataOutputStream out;
+
+    // In ABFS outputstream we need to add this to the current pos
+    protected long initialFileSize = 0;
+
+    public long getPos() throws IOException {
+        return out.getPos();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public abstract void flush() throws IOException;
+
+    @Override
+    public void sync() throws IOException {
+        out.hflush();
+        out.hsync();
+    }

Review Comment:
   1. IIUC, @anoopsjohn 's comment was that you can call `hsync` without calling `hflush` in `sync()` for ABFS. It was not about ABFS should call `hsync` in `flush()`. Though the two methods do the same thing, according to him.
   2. Not needing to call `hflush` before `hsync` sounds to be a specialty of ABFS. We probably should make `AzureBlobFsRecoverableDataOutputStream` override the `flush/sync` method, rather than make the method abstract in the base class.



##########
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableSerializer.java:
##########
@@ -33,7 +33,7 @@
 @Internal
 class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
 
-    static final HadoopRecoverableSerializer INSTANCE = new HadoopRecoverableSerializer();
+    public static final HadoopRecoverableSerializer INSTANCE = new HadoopRecoverableSerializer();

Review Comment:
   No longer needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1376622507

   > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/#important-considerations Here we say that we support S3, HDFS and local. Probably we should add ABFS.
   
   Nice catch. I guess we should add both ABFS and WASB here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1372092968

   @ramkrish86, thanks for the notice. I'll try to find time for this asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1376718888

   > That is to say, with what we currently have, there's no need to update the FTS documentation. And if we want to support FTS + ABFS + Spark/Hive/Trino, we would need to open another PR to the FTS project, which is definitely welcome.
   
   Perfect. Will keep a watch on this and we will raise PR accordingly. Thanks once again @xintongsong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1357107009

   @xintongsong  - A gentle ping!!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1053072721


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   Bulk-encoded formats (ORC, Parquet, etc.) do not need truncation, because in-progress files are guaranteed to be finished / flushed on checkpoints. Truncation is only needed when an in-progress file can live through multiple checkpoints, which is only supported for row-encoded formats. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1359402962

   Found an issue with 3.3.2 and above ABFS version. The test case I made it work functionally by changing the hadoop version to 3.3.1 and it works fine. A minor config update might be needed. Will test and confirm  back . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1053150261


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   Thanks @xintongsong  for this info. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1051953491


##########
flink-filesystems/flink-azure-fs-hadoop/pom.xml:
##########
@@ -185,13 +185,6 @@ under the License.
 								</relocation>
 							</relocations>
 							<filters>
-								<filter>
-									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
-									<excludes>
-										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
-										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
-									</excludes>
-								</filter>

Review Comment:
   Since currently we are extending these HadoopRecoverable, this not being in the jar while I submit the job it is not able to pick it up. This was the initial problem that users were reporting too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1060555866


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);

Review Comment:
   Can we resolve this if rename by default is with 'overwrite'?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1060549083


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {

Review Comment:
   
   I have tried refactoring this . I am not able to remove HadoopFsRecoverable totally as it is also getting passed in the constructor. But avoided some duplication between HadoopRecoverableFsDataOutputStream and AzureBlobFsRecoverableDataOutputStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] anoopsjohn commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
anoopsjohn commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1061619023


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);

Review Comment:
   Yes.  May be u can add some code level comments abt this case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1375355317

   @xintongsong  - Thanks for the approval. Heartfelt thanks for shepherding and helping with the PR reviews. 
   BTW want to bring to your notice that before raising this PR I had asked a query in the dev@ about running the Azure tests in the CI. I was referred to https://cwiki.apache.org/confluence/display/FLINK/Continuous+Integration and also was told to contact Martjin. Seems the Azure IT tests runs only on CI for master. So may be after intergration we might have to check if the tests really run in CI. However I have ensured the tests  have passed successfully in my local.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1375993373

   @xintongsong  - Also want to know about this https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/filesystems/overview/.
   Now since ABFS support is available in FileSink (after this PR gets merged), shall we have the similar support for table store too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1375387835

   @ramkrish86,
   It was nice to cooperate with you. Thanks for the contribution.
   
   Concerning the IT tests, I think they are only available to the cron builds for master and release branches (like [this](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44569&view=results)). We have release managers monitoring the the cron builds and reporting issues to jira. I'll also pay attention to this in the first couple of days after merging this.
   
   Just to double check. I thought we also need to update the user documentation for this change. However, I just checked the documentation and it seems never mentioned that `abfs://` doesn't work with the file sink. So nothing needs to be changed for the documentation, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1051964739


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);
+                remaining -= buf.length;
+                LOG.info("Bytes remaining to read {}", remaining);
+                fsDataOutputStream.write(buf);
+                dataWritten += buf.length;
+                LOG.info("Successfully wrote {} bytes of data {}", dataWritten);

Review Comment:
   Done. Great catch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1051919235


##########
flink-filesystems/flink-azure-fs-hadoop/pom.xml:
##########
@@ -185,13 +185,6 @@ under the License.
 								</relocation>
 							</relocations>
 							<filters>
-								<filter>
-									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
-									<excludes>
-										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
-										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
-									</excludes>
-								</filter>

Review Comment:
   Just trying to understand, why do we need this change?



##########
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java:
##########
@@ -37,7 +37,7 @@
 public class HadoopFileSystem extends FileSystem {
 
     /** The wrapped Hadoop File System. */
-    private final org.apache.hadoop.fs.FileSystem fs;
+    protected final org.apache.hadoop.fs.FileSystem fs;

Review Comment:
   This change is unnecessary. You can use `getHadoopFileSystem()` where it's needed.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)

Review Comment:
   Can be package private.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);
+                remaining -= buf.length;
+                LOG.info("Bytes remaining to read {}", remaining);
+                fsDataOutputStream.write(buf);
+                dataWritten += buf.length;
+                LOG.info("Successfully wrote {} bytes of data {}", dataWritten);

Review Comment:
   2 placeholders while only 1 argument is provided.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);

Review Comment:
   Better to check the actual number of bytes being read.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobRecoverableWriter.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/** Recoverable writer for AzureBlob file system. */
+public class AzureBlobRecoverableWriter extends HadoopRecoverableWriter {
+    /**
+     * Creates a new Recoverable writer.
+     *
+     * @param fs The AzureBlob file system on which the writer operates.
+     */
+    public AzureBlobRecoverableWriter(FileSystem fs) {
+        super(fs);
+    }
+
+    protected void checkSupportedFSSchemes(org.apache.hadoop.fs.FileSystem fs) {
+        // This writer is only supported on a subset of file systems
+        if (!("abfs".equalsIgnoreCase(fs.getScheme())
+                || "abfss".equalsIgnoreCase(fs.getScheme()))) {
+            throw new UnsupportedOperationException(
+                    "Recoverable writers on AzureBlob are only supported for ABFS");
+        }
+    }
+
+    @Override
+    protected RecoverableFsDataOutputStream getRecoverableFsDataOutputStream(
+            org.apache.hadoop.fs.Path targetFile, org.apache.hadoop.fs.Path tempFile)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, targetFile, tempFile);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return false;
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+        return false;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable recoverable)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter(
+                fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+        @SuppressWarnings("unchecked")
+        SimpleVersionedSerializer<CommitRecoverable> typedSerializer =
+                (SimpleVersionedSerializer<CommitRecoverable>)
+                        (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+        return typedSerializer;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+        @SuppressWarnings("unchecked")
+        SimpleVersionedSerializer<ResumeRecoverable> typedSerializer =
+                (SimpleVersionedSerializer<ResumeRecoverable>)
+                        (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+        return typedSerializer;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }

Review Comment:
   And then we won't need the visibility change for `HadoopRecoverableSerializer`.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFileSystem.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/** Wraps the hadoop's file system to create AzureBlobFileSystem. */
+@Internal
+class AzureBlobFileSystem extends HadoopFileSystem {
+    /**
+     * Wraps the given Hadoop File System object as a Flink File System object. The given Hadoop
+     * file system object is expected to be initialized already.
+     *
+     * @param hadoopFileSystem The Azure Blob FileSystem that will be used under the hood.
+     */
+    public AzureBlobFileSystem(FileSystem hadoopFileSystem) {

Review Comment:
   Can be package private.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobRecoverableWriter.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/** Recoverable writer for AzureBlob file system. */
+public class AzureBlobRecoverableWriter extends HadoopRecoverableWriter {
+    /**
+     * Creates a new Recoverable writer.
+     *
+     * @param fs The AzureBlob file system on which the writer operates.
+     */
+    public AzureBlobRecoverableWriter(FileSystem fs) {
+        super(fs);
+    }
+
+    protected void checkSupportedFSSchemes(org.apache.hadoop.fs.FileSystem fs) {
+        // This writer is only supported on a subset of file systems
+        if (!("abfs".equalsIgnoreCase(fs.getScheme())
+                || "abfss".equalsIgnoreCase(fs.getScheme()))) {
+            throw new UnsupportedOperationException(
+                    "Recoverable writers on AzureBlob are only supported for ABFS");
+        }
+    }
+
+    @Override
+    protected RecoverableFsDataOutputStream getRecoverableFsDataOutputStream(
+            org.apache.hadoop.fs.Path targetFile, org.apache.hadoop.fs.Path tempFile)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, targetFile, tempFile);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return false;
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+        return false;
+    }

Review Comment:
   ```suggestion
   ```



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)

Review Comment:
   Can be package private.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {

Review Comment:
   This class is very similar to `HadoopRecoverableFsDataOutputStream`. The only differences are in the initialization and the committer implementation. I wonder if they can be further deduplicated. I'm particularly interested in hiding `HadoopFsRecoverable` from this submodule.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobRecoverableWriter.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/** Recoverable writer for AzureBlob file system. */
+public class AzureBlobRecoverableWriter extends HadoopRecoverableWriter {
+    /**
+     * Creates a new Recoverable writer.
+     *
+     * @param fs The AzureBlob file system on which the writer operates.
+     */
+    public AzureBlobRecoverableWriter(FileSystem fs) {
+        super(fs);
+    }
+
+    protected void checkSupportedFSSchemes(org.apache.hadoop.fs.FileSystem fs) {
+        // This writer is only supported on a subset of file systems
+        if (!("abfs".equalsIgnoreCase(fs.getScheme())
+                || "abfss".equalsIgnoreCase(fs.getScheme()))) {
+            throw new UnsupportedOperationException(
+                    "Recoverable writers on AzureBlob are only supported for ABFS");
+        }
+    }
+
+    @Override
+    protected RecoverableFsDataOutputStream getRecoverableFsDataOutputStream(
+            org.apache.hadoop.fs.Path targetFile, org.apache.hadoop.fs.Path tempFile)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, targetFile, tempFile);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return false;
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+        return false;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable recoverable)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter(
+                fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+        @SuppressWarnings("unchecked")
+        SimpleVersionedSerializer<CommitRecoverable> typedSerializer =
+                (SimpleVersionedSerializer<CommitRecoverable>)
+                        (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+        return typedSerializer;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+        @SuppressWarnings("unchecked")
+        SimpleVersionedSerializer<ResumeRecoverable> typedSerializer =
+                (SimpleVersionedSerializer<ResumeRecoverable>)
+                        (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+        return typedSerializer;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }

Review Comment:
   ```suggestion
   ```



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);
+            // 2 MB buffers. TODO : Make this configurable
+            long remaining = recoverable.offset();
+            byte[] buf = null;
+            long dataWritten = 0;
+            while (remaining != 0) {
+                if (minBufferLength < remaining) {
+                    buf = new byte[minBufferLength];
+                } else {
+                    buf = new byte[(int) remaining];
+                }
+                fsDis.read(buf, 0, buf.length);
+                remaining -= buf.length;
+                LOG.info("Bytes remaining to read {}", remaining);
+                fsDataOutputStream.write(buf);
+                dataWritten += buf.length;
+                LOG.info("Successfully wrote {} bytes of data {}", dataWritten);
+            }
+            // TODO : Support intermediate flush?
+            LOG.info("Closing the temp rename file {}", renameTempPath);
+            fsDataOutputStream.close();
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Exception while trying to truncate the temp file {}",
+                    tempFile);
+            // We cannot recover. This we can control if user does not want this??
+            throw e;
+        }
+        try {
+            LOG.info("Deleting the actual temp file {}", tempFile);
+            fs.delete(tempFile, false);
+        } catch (IOException e) {
+            LOG.error("Unable to recover. Error while deleting the temp file {}", tempFile);
+            // unable to recover.
+            throw e;
+        }
+        rename(fs, renameTempPath);
+    }
+
+    private void rename(FileSystem fs, Path renameTempPath) throws IOException {
+        LOG.info("Renaming the temp rename file {} back to tempFile {}", renameTempPath, tempFile);
+        try {
+            boolean result = fs.rename(renameTempPath, tempFile);
+            if (!result) {
+                LOG.error(
+                        "Unable to recover. Rename operation failed {} to {}",
+                        renameTempPath,
+                        tempFile);
+                throw new IOException("Unable to recover. Rename operation failed");
+            } else {
+                LOG.info("Rename was successful");
+            }
+        } catch (IOException e) {
+            LOG.error(
+                    "Unable to recover. Renaming of tempFile did not happen after truncating {} to {}",
+                    renameTempPath,
+                    tempFile);
+            throw e;
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return out.getPos();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.hflush();
+    }
+
+    @Override
+    public void sync() throws IOException {
+        out.hflush();
+        out.hsync();
+    }
+
+    @Override
+    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+        sync();
+        return new HadoopFsRecoverable(targetFile, tempFile, getPos() + initialFileSize);
+    }
+
+    @Override
+    public Committer closeForCommit() throws IOException {
+        final long pos = getPos();
+        close();
+        return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter(
+                fs, new HadoopFsRecoverable(targetFile, tempFile, pos + initialFileSize));
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.close();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Committer
+    // ------------------------------------------------------------------------
+
+    /**
+     * Implementation of a committer for the Hadoop File System abstraction. This implementation
+     * commits by renaming the temp file to the final file path. The temp file is truncated before
+     * renaming in case there is trailing garbage data.
+     */
+    static class ABFSCommitter implements Committer {
+
+        private final FileSystem fs;
+        private final HadoopFsRecoverable recoverable;
+
+        ABFSCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
+            this.fs = checkNotNull(fs);
+            this.recoverable = checkNotNull(recoverable);
+        }
+
+        @Override
+        public void commit() throws IOException {
+            final Path src = recoverable.tempFile();
+            final Path dest = recoverable.targetFile();
+            final long expectedLength = recoverable.offset();
+            FileStatus srcStatus = null;
+            try {
+                srcStatus = fs.getFileStatus(src);
+            } catch (FileNotFoundException fnfe) {
+                // srcStatus will be null
+            } catch (IOException e) {
+                throw new IOException("Cannot clean commit: Staging file does not exist.");
+            }
+            if (srcStatus != null) {
+                LOG.debug(
+                        "The srcStatus is {} and exp length is {}",
+                        srcStatus.getLen(),
+                        expectedLength);
+                if (srcStatus.getLen() != expectedLength) {
+                    LOG.error(
+                            "The src file {} with length {} does not match the expected length {}",
+                            src,
+                            srcStatus.getLen(),
+                            expectedLength);
+                    throw new IOException(
+                            "The src file "
+                                    + src
+                                    + " with length "
+                                    + srcStatus.getLen()
+                                    + " "
+                                    + "does not match the expected length "
+                                    + expectedLength);
+                }
+                try {
+                    fs.rename(src, dest);
+                } catch (IOException e) {
+                    throw new IOException(
+                            "Committing file by rename failed: " + src + " to " + dest, e);
+                }
+            } else if (!fs.exists(dest)) {
+                // neither exists - that can be a sign of
+                //   - (1) a serious problem (file system loss of data)
+                //   - (2) a recovery of a savepoint that is some time old and the users
+                //         removed the files in the meantime.
+
+                // TODO how to handle this?
+                // We probably need an option for users whether this should log,
+                // or result in an exception or unrecoverable exception

Review Comment:
   I think we should explicitly throw an exception here, because the data cannot be committed.



##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   This means that, despite the src and dest files are on the same remote file system, all the data being copied have to go through the local system where this piece of code is executed. I wonder if there's a better way doing this. (Maybe not, just trying to double-check on this.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on PR #21508:
URL: https://github.com/apache/flink/pull/21508#issuecomment-1357757731

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1052900496


##########
flink-filesystems/flink-azure-fs-hadoop/pom.xml:
##########
@@ -185,13 +185,6 @@ under the License.
 								</relocation>
 							</relocations>
 							<filters>
-								<filter>
-									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
-									<excludes>
-										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
-										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
-									</excludes>
-								</filter>

Review Comment:
   Can I resolve this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1053073322


##########
flink-filesystems/flink-azure-fs-hadoop/pom.xml:
##########
@@ -185,13 +185,6 @@ under the License.
 								</relocation>
 							</relocations>
 							<filters>
-								<filter>
-									<artifact>org.apache.flink:flink-hadoop-fs</artifact>
-									<excludes>
-										<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
-										<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
-									</excludes>
-								</filter>

Review Comment:
   Sure. Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1053150962


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   Can we resolve this conversation also @xintongsong - How ever after our tests I can update here before we want to commit this - if that is fine with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xintongsong commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
xintongsong commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1053172401


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream} for AzureBlob's file system
+ * abstraction.
+ */
+@Internal
+public class AzureBlobFsRecoverableDataOutputStream extends RecoverableFsDataOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AzureBlobFsRecoverableDataOutputStream.class);
+
+    private final FileSystem fs;
+
+    private final Path targetFile;
+
+    private final Path tempFile;
+
+    private final FSDataOutputStream out;
+
+    // Not final to override in tests
+    public static int minBufferLength = 2097152;
+
+    // init to 0. When ever recovery is done add this to the pos.
+    private long initialFileSize = 0;
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        LOG.debug("The targetFile is {}", targetFile.getName());
+        this.tempFile = checkNotNull(tempFile);
+        LOG.debug("The tempFile is {}", tempFile.getName());
+        this.out = fs.create(tempFile);
+    }
+
+    public AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable)
+            throws IOException {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(recoverable.targetFile());
+        this.tempFile = checkNotNull(recoverable.tempFile());
+        long len = fs.getFileStatus(tempFile).getLen();
+        LOG.info("The recoverable offset is {} and the file len is {}", recoverable.offset(), len);
+        // Happens when we recover from a previously committed offset. Otherwise this is not
+        // really needed
+        if (len > recoverable.offset()) {
+            truncate(fs, recoverable);
+        } else if (len < recoverable.offset()) {
+            LOG.error(
+                    "Temp file length {} is less than the expected recoverable offset {}",
+                    len,
+                    recoverable.offset());
+            throw new IOException(
+                    "Unable to create recoverable outputstream as length of file "
+                            + len
+                            + " is less than "
+                            + "recoverable offset "
+                            + recoverable.offset());
+        }
+        // In ABFS when we try to append we don't account for the initial file size like we do in
+        // DFS.
+        // So we explicitly store this and when we do a persist call we make use of it.
+        initialFileSize = fs.getFileStatus(tempFile).getLen();
+        out = fs.append(tempFile);
+        LOG.debug("Created a new OS for appending {}", tempFile);
+    }
+
+    private void truncate(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
+        Path renameTempPath = new Path(tempFile.toString() + ".rename");
+        try {
+            LOG.info(
+                    "Creating the temp rename file {} for truncating the tempFile {}",
+                    renameTempPath,
+                    tempFile);
+            FSDataOutputStream fsDataOutputStream = fs.create(renameTempPath);
+            LOG.info("Opening the tempFile {} for truncate", tempFile);
+            FSDataInputStream fsDis = fs.open(tempFile);

Review Comment:
   Sounds like there's no off-the-shelf solution for this at the moment. I think we should at least understand the performance impact and mention it in the user documentation.
   
   Let's leave this conversation unresolved as a reminder for now. This won't block the PR review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ramkrish86 commented on a diff in pull request #21508: [FLINK-30128] Introduce Azure Data Lake Gen2 APIs in the Hadoop Recoverable path

Posted by GitBox <gi...@apache.org>.
ramkrish86 commented on code in PR #21508:
URL: https://github.com/apache/flink/pull/21508#discussion_r1051961474


##########
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobRecoverableWriter.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableSerializer;
+import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/** Recoverable writer for AzureBlob file system. */
+public class AzureBlobRecoverableWriter extends HadoopRecoverableWriter {
+    /**
+     * Creates a new Recoverable writer.
+     *
+     * @param fs The AzureBlob file system on which the writer operates.
+     */
+    public AzureBlobRecoverableWriter(FileSystem fs) {
+        super(fs);
+    }
+
+    protected void checkSupportedFSSchemes(org.apache.hadoop.fs.FileSystem fs) {
+        // This writer is only supported on a subset of file systems
+        if (!("abfs".equalsIgnoreCase(fs.getScheme())
+                || "abfss".equalsIgnoreCase(fs.getScheme()))) {
+            throw new UnsupportedOperationException(
+                    "Recoverable writers on AzureBlob are only supported for ABFS");
+        }
+    }
+
+    @Override
+    protected RecoverableFsDataOutputStream getRecoverableFsDataOutputStream(
+            org.apache.hadoop.fs.Path targetFile, org.apache.hadoop.fs.Path tempFile)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, targetFile, tempFile);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return false;
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+        return false;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable recoverable)
+            throws IOException {
+        return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter(
+                fs, (HadoopFsRecoverable) recoverable);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+        @SuppressWarnings("unchecked")
+        SimpleVersionedSerializer<CommitRecoverable> typedSerializer =
+                (SimpleVersionedSerializer<CommitRecoverable>)
+                        (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+
+        return typedSerializer;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+        @SuppressWarnings("unchecked")
+        SimpleVersionedSerializer<ResumeRecoverable> typedSerializer =
+                (SimpleVersionedSerializer<ResumeRecoverable>)
+                        (SimpleVersionedSerializer<?>) HadoopRecoverableSerializer.INSTANCE;
+        return typedSerializer;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }

Review Comment:
   Got it sure. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org