You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by dhurandar S <dh...@gmail.com> on 2020/06/19 19:19:38 UTC

adding s3 object metadata while using StreamFileSink

We are creating files in S3 and we want to update the S3 object metadata
with some security-related information for governance purposes.

Right now Apache Flink totally abstracts how and when S3 object gets
created in the system.

Is there a way that we can pass the S3 object metadata and update it for
the object created.

If not,

How can we know when Apache Flink has created an S3 file. Deterministically.
Since once its created in S3 we can write Java code after that to add those
metadata information?

-- 
Thank you and regards,
Dhurandar

Re: adding s3 object metadata while using StreamFileSink

Posted by Yun Gao <yu...@aliyun.com>.
Hi Dhurandar,

    With my understand I think what you need is to get notified when a file is written successfully (committed) on the S3 FileSystem. However, currently there is no public API for the listener and there an issue tracking it [1].

    With the current version, one possible method comes to me is that may have to use reflection to access some internal states of StreamFileSink to get the committed files. As a whole, you may need to implement a customized StreamingFileSink and override the notifyCheckpointComplete method, where the new S3 file get committed and visible:

class CustomizedStreamingFileSink extends StreamingFileSink {

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // 1. First use reflection to get the list of files will be committed in this call.
        // The list of files should be get via StreamingFile -> ( StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there will be multiple Buckets) -> (for each Bucket) pendingFileRecoverablesPerCheckpoint
        // Then we could get the iterator of pending files to commit in this time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2]
  // Then you could get the S3 object names via (PendingFileRecover if 1.11 is used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName.

        super.notifyCheckpointComplete(checkpointId); // Get files committed normally.

        // 3. Then here could start writing meta info for S3 objects recorded in step 1. 
    }
}

For a single file it may get committed multiple times, therefore the writing meta info action must also be able to handle the repeat writing.

Another possible method will be to use a seperate source operator to periodly scans the S3 file system to detect the newly added files and modify their meta data. There should be embedding source function ContinuousFileMonitoringFunction[3] for this work, and I think it might be modified or reused for scanning the files. 

Best,
  Yun 


[1] https://issues.apache.org/jira/browse/FLINK-17900
[2] https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268
[3] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java


------------------------------------------------------------------
Sender:dhurandar S<dh...@gmail.com>
Date:2020/06/20 03:19:38
Recipient:user<us...@flink.apache.org>; Flink Dev<de...@flink.apache.org>
Theme:adding s3 object metadata while using StreamFileSink

We are creating files in S3 and we want to update the S3 object metadata with some security-related information for governance purposes.

Right now Apache Flink totally abstracts how and when S3 object gets created in the system. 

Is there a way that we can pass the S3 object metadata and update it for the object created.

If not, 

How can we know when Apache Flink has created an S3 file. Deterministically.
Since once its created in S3 we can write Java code after that to add those metadata information?

-- 
Thank you and regards,
Dhurandar



Re: adding s3 object metadata while using StreamFileSink

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Dhurandar,

    With my understand I think what you need is to get notified when a file is written successfully (committed) on the S3 FileSystem. However, currently there is no public API for the listener and there an issue tracking it [1].

    With the current version, one possible method comes to me is that may have to use reflection to access some internal states of StreamFileSink to get the committed files. As a whole, you may need to implement a customized StreamingFileSink and override the notifyCheckpointComplete method, where the new S3 file get committed and visible:

class CustomizedStreamingFileSink extends StreamingFileSink {

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // 1. First use reflection to get the list of files will be committed in this call.
        // The list of files should be get via StreamingFile -> ( StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there will be multiple Buckets) -> (for each Bucket) pendingFileRecoverablesPerCheckpoint
        // Then we could get the iterator of pending files to commit in this time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2]
  // Then you could get the S3 object names via (PendingFileRecover if 1.11 is used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName.

        super.notifyCheckpointComplete(checkpointId); // Get files committed normally.

        // 3. Then here could start writing meta info for S3 objects recorded in step 1. 
    }
}

For a single file it may get committed multiple times, therefore the writing meta info action must also be able to handle the repeat writing.

Another possible method will be to use a seperate source operator to periodly scans the S3 file system to detect the newly added files and modify their meta data. There should be embedding source function ContinuousFileMonitoringFunction[3] for this work, and I think it might be modified or reused for scanning the files. 

Best,
  Yun 


[1] https://issues.apache.org/jira/browse/FLINK-17900
[2] https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268
[3] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java


------------------------------------------------------------------
Sender:dhurandar S<dh...@gmail.com>
Date:2020/06/20 03:19:38
Recipient:user<us...@flink.apache.org>; Flink Dev<de...@flink.apache.org>
Theme:adding s3 object metadata while using StreamFileSink

We are creating files in S3 and we want to update the S3 object metadata with some security-related information for governance purposes.

Right now Apache Flink totally abstracts how and when S3 object gets created in the system. 

Is there a way that we can pass the S3 object metadata and update it for the object created.

If not, 

How can we know when Apache Flink has created an S3 file. Deterministically.
Since once its created in S3 we can write Java code after that to add those metadata information?

-- 
Thank you and regards,
Dhurandar



Re: adding s3 object metadata while using StreamFileSink

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

Maybe a bit crazy idea, but you could also try extending the S3
filesystem and add the metadata there. You could write a thin wrapper
for the existing filesystem. If you'd like to go that route you might
want to check this page[1]. You could use that filesystem with your
custom scheme.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/#file-systems

On 19/06/2020 21:19, dhurandar S wrote:
> We are creating files in S3 and we want to update the S3 object
> metadata with some security-related information for governance purposes.
>
> Right now Apache Flink totally abstracts how and when S3 object gets
> created in the system. 
>
> Is there a way that we can pass the S3 object metadata and update it
> for the object created.
>
> If not, 
>
> How can we know when Apache Flink has created an S3 file.
> Deterministically.
> Since once its created in S3 we can write Java code after that to add
> those metadata information?
>
> -- 
> Thank you and regards,
> Dhurandar
>