You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Diephouse <da...@netzooid.com> on 2020/10/07 03:53:17 UTC

S3 StreamingFileSink issues

First, let me say, Flink is super cool - thanks everyone for making my life
easier in a lot of ways! Wish I had this 10 years ago....

Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
Note that Flink is embedded in my app, not running as a standalone cluster.

I am having a few problems, which I have illustrated in the small test case
below.

1) After my job finishes, data never gets committed to S3. Looking through
the code, I've noticed that data gets flushed to disk, but the multi-part
upload is never finished. Even though my data doesn't hit the min part
size, I would expect that if my job ends, my data should get uploaded since
the job is 100% done.

I am also having problems when the job is running not uploading - but I
haven't been able to distill that down to a simple test case, so I thought
I'd start here.

2) The S3 Filesystem does not pull credentials from the Flink Configuration
when running in embedded mode. I have a workaround for this, but it is
ugly. If you comment out the line in the test case which talks about this
workaround, you will end up with a "Java.net.SocketException: Host is down"

Can anyone shed light on these two issues? Thanks!

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.junit.jupiter.api.Test;

public class S3Test {
    @Test
    public void whyDoesntThisWork() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("state.backend",
MemoryStateBackendFactory.class.getName());
        configuration.setString("s3.access.key", "****");
        configuration.setString("s3.secret.key", "****");

        // If I don't do this, the S3 filesystem never gets the credentials
        FileSystem.initialize(configuration, null);

        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);

        StreamingFileSink<String> s3 = StreamingFileSink
                .forRowFormat(new Path("s3://bucket/"), new
SimpleStringEncoder<String>())
                .build();

        env.fromElements("string1", "string2")
            .addSink(s3);

        env.execute();

        System.out.println("Done");
    }
}


-- 
Dan Diephouse
@dandiep

Re: S3 StreamingFileSink issues

Posted by Dan Diephouse <da...@netzooid.com>.
FYI - I discovered that if I specify the Hadoop compression codec it works
fine. E.g.:

CompressWriters.forExtractor(new
DefaultExtractor()).withHadoopCompression("GzipCodec")

Haven't dug into exactly why yet.

On Wed, Oct 7, 2020 at 12:14 PM David Anderson <da...@alpinegizmo.com>
wrote:

> Looping in @Kostas Kloudas <kk...@apache.org> who should be able to
> clarify things.
>
> David
>
> On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse <da...@netzooid.com> wrote:
>
>> Thanks! Completely missed that in the docs. It's now working, however
>> it's not working with compression writers. Someone else noted this issue
>> here:
>>
>>
>> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming
>>
>> Looking at the code, I'm not sure I follow the nuances of why sync()
>> doesn't just do a call to flush in RefCountedBufferingFileStream:
>>
>> public void sync() throws IOException {
>> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
>> cannot sync state to S3. " +
>> "Use persist() to create a persistent recoverable intermediate point.");
>> }
>>
>> If there are any pointers here on what should happen, happy to submit a
>> patch.
>>
>>
>>
>>
>> On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com>
>> wrote:
>>
>>> Dan,
>>>
>>> The first point you've raised is a known issue: When a job is stopped,
>>> the unfinished part files are not transitioned to the finished state. This
>>> is mentioned in the docs as Important Note 2 [1], and fixing this is
>>> waiting on FLIP-46 [2]. That section of the docs also includes some
>>> S3-specific warnings, but nothing pertaining to managing credentials.
>>> Perhaps [3] will help.
>>>
>>> Regards,
>>> David
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>>
>>>
>>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <da...@netzooid.com> wrote:
>>>
>>>> First, let me say, Flink is super cool - thanks everyone for making my
>>>> life easier in a lot of ways! Wish I had this 10 years ago....
>>>>
>>>> Onto the fun stuff: I am attempting to use the StreamingFileSink with
>>>> S3. Note that Flink is embedded in my app, not running as a standalone
>>>> cluster.
>>>>
>>>> I am having a few problems, which I have illustrated in the small test
>>>> case below.
>>>>
>>>> 1) After my job finishes, data never gets committed to S3. Looking
>>>> through the code, I've noticed that data gets flushed to disk, but the
>>>> multi-part upload is never finished. Even though my data doesn't hit the
>>>> min part size, I would expect that if my job ends, my data should get
>>>> uploaded since the job is 100% done.
>>>>
>>>> I am also having problems when the job is running not uploading - but I
>>>> haven't been able to distill that down to a simple test case, so I thought
>>>> I'd start here.
>>>>
>>>> 2) The S3 Filesystem does not pull credentials from the Flink
>>>> Configuration when running in embedded mode. I have a workaround for this,
>>>> but it is ugly. If you comment out the line in the test case which talks
>>>> about this workaround, you will end up with a "Java.net.SocketException:
>>>> Host is down"
>>>>
>>>> Can anyone shed light on these two issues? Thanks!
>>>>
>>>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>>>> import org.apache.flink.configuration.Configuration;
>>>> import org.apache.flink.core.fs.FileSystem;
>>>> import org.apache.flink.core.fs.Path;
>>>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>>>> import
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>>>> import
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import
>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>>> import org.junit.jupiter.api.Test;
>>>>
>>>> public class S3Test {
>>>>     @Test
>>>>     public void whyDoesntThisWork() throws Exception {
>>>>         Configuration configuration = new Configuration();
>>>>         configuration.setString("state.backend",
>>>> MemoryStateBackendFactory.class.getName());
>>>>         configuration.setString("s3.access.key", "****");
>>>>         configuration.setString("s3.secret.key", "****");
>>>>
>>>>         // If I don't do this, the S3 filesystem never gets the
>>>> credentials
>>>>         FileSystem.initialize(configuration, null);
>>>>
>>>>         LocalStreamEnvironment env =
>>>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>>>
>>>>         StreamingFileSink<String> s3 = StreamingFileSink
>>>>                 .forRowFormat(new Path("s3://bucket/"), new
>>>> SimpleStringEncoder<String>())
>>>>                 .build();
>>>>
>>>>         env.fromElements("string1", "string2")
>>>>             .addSink(s3);
>>>>
>>>>         env.execute();
>>>>
>>>>         System.out.println("Done");
>>>>     }
>>>> }
>>>>
>>>>
>>>> --
>>>> Dan Diephouse
>>>> @dandiep
>>>>
>>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Re: S3 StreamingFileSink issues

Posted by David Anderson <da...@alpinegizmo.com>.
Looping in @Kostas Kloudas <kk...@apache.org> who should be able to
clarify things.

David

On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse <da...@netzooid.com> wrote:

> Thanks! Completely missed that in the docs. It's now working, however it's
> not working with compression writers. Someone else noted this issue here:
>
>
> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming
>
> Looking at the code, I'm not sure I follow the nuances of why sync()
> doesn't just do a call to flush in RefCountedBufferingFileStream:
>
> public void sync() throws IOException {
> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
> cannot sync state to S3. " +
> "Use persist() to create a persistent recoverable intermediate point.");
> }
>
> If there are any pointers here on what should happen, happy to submit a
> patch.
>
>
>
>
> On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com>
> wrote:
>
>> Dan,
>>
>> The first point you've raised is a known issue: When a job is stopped,
>> the unfinished part files are not transitioned to the finished state. This
>> is mentioned in the docs as Important Note 2 [1], and fixing this is
>> waiting on FLIP-46 [2]. That section of the docs also includes some
>> S3-specific warnings, but nothing pertaining to managing credentials.
>> Perhaps [3] will help.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>
>>
>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <da...@netzooid.com> wrote:
>>
>>> First, let me say, Flink is super cool - thanks everyone for making my
>>> life easier in a lot of ways! Wish I had this 10 years ago....
>>>
>>> Onto the fun stuff: I am attempting to use the StreamingFileSink with
>>> S3. Note that Flink is embedded in my app, not running as a standalone
>>> cluster.
>>>
>>> I am having a few problems, which I have illustrated in the small test
>>> case below.
>>>
>>> 1) After my job finishes, data never gets committed to S3. Looking
>>> through the code, I've noticed that data gets flushed to disk, but the
>>> multi-part upload is never finished. Even though my data doesn't hit the
>>> min part size, I would expect that if my job ends, my data should get
>>> uploaded since the job is 100% done.
>>>
>>> I am also having problems when the job is running not uploading - but I
>>> haven't been able to distill that down to a simple test case, so I thought
>>> I'd start here.
>>>
>>> 2) The S3 Filesystem does not pull credentials from the Flink
>>> Configuration when running in embedded mode. I have a workaround for this,
>>> but it is ugly. If you comment out the line in the test case which talks
>>> about this workaround, you will end up with a "Java.net.SocketException:
>>> Host is down"
>>>
>>> Can anyone shed light on these two issues? Thanks!
>>>
>>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.core.fs.FileSystem;
>>> import org.apache.flink.core.fs.Path;
>>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>> import org.junit.jupiter.api.Test;
>>>
>>> public class S3Test {
>>>     @Test
>>>     public void whyDoesntThisWork() throws Exception {
>>>         Configuration configuration = new Configuration();
>>>         configuration.setString("state.backend",
>>> MemoryStateBackendFactory.class.getName());
>>>         configuration.setString("s3.access.key", "****");
>>>         configuration.setString("s3.secret.key", "****");
>>>
>>>         // If I don't do this, the S3 filesystem never gets the
>>> credentials
>>>         FileSystem.initialize(configuration, null);
>>>
>>>         LocalStreamEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>>
>>>         StreamingFileSink<String> s3 = StreamingFileSink
>>>                 .forRowFormat(new Path("s3://bucket/"), new
>>> SimpleStringEncoder<String>())
>>>                 .build();
>>>
>>>         env.fromElements("string1", "string2")
>>>             .addSink(s3);
>>>
>>>         env.execute();
>>>
>>>         System.out.println("Done");
>>>     }
>>> }
>>>
>>>
>>> --
>>> Dan Diephouse
>>> @dandiep
>>>
>>
>
> --
> Dan Diephouse
> @dandiep
>

Re: S3 StreamingFileSink issues

Posted by Dan Diephouse <da...@netzooid.com>.
Thanks! Completely missed that in the docs. It's now working, however it's
not working with compression writers. Someone else noted this issue here:

https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming

Looking at the code, I'm not sure I follow the nuances of why sync()
doesn't just do a call to flush in RefCountedBufferingFileStream:

public void sync() throws IOException {
throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
cannot sync state to S3. " +
"Use persist() to create a persistent recoverable intermediate point.");
}

If there are any pointers here on what should happen, happy to submit a
patch.




On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com> wrote:

> Dan,
>
> The first point you've raised is a known issue: When a job is stopped, the
> unfinished part files are not transitioned to the finished state. This is
> mentioned in the docs as Important Note 2 [1], and fixing this is waiting
> on FLIP-46 [2]. That section of the docs also includes some S3-specific
> warnings, but nothing pertaining to managing credentials. Perhaps [3] will
> help.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>
>
> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <da...@netzooid.com> wrote:
>
>> First, let me say, Flink is super cool - thanks everyone for making my
>> life easier in a lot of ways! Wish I had this 10 years ago....
>>
>> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
>> Note that Flink is embedded in my app, not running as a standalone cluster.
>>
>> I am having a few problems, which I have illustrated in the small test
>> case below.
>>
>> 1) After my job finishes, data never gets committed to S3. Looking
>> through the code, I've noticed that data gets flushed to disk, but the
>> multi-part upload is never finished. Even though my data doesn't hit the
>> min part size, I would expect that if my job ends, my data should get
>> uploaded since the job is 100% done.
>>
>> I am also having problems when the job is running not uploading - but I
>> haven't been able to distill that down to a simple test case, so I thought
>> I'd start here.
>>
>> 2) The S3 Filesystem does not pull credentials from the Flink
>> Configuration when running in embedded mode. I have a workaround for this,
>> but it is ugly. If you comment out the line in the test case which talks
>> about this workaround, you will end up with a "Java.net.SocketException:
>> Host is down"
>>
>> Can anyone shed light on these two issues? Thanks!
>>
>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.core.fs.FileSystem;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>> import org.junit.jupiter.api.Test;
>>
>> public class S3Test {
>>     @Test
>>     public void whyDoesntThisWork() throws Exception {
>>         Configuration configuration = new Configuration();
>>         configuration.setString("state.backend",
>> MemoryStateBackendFactory.class.getName());
>>         configuration.setString("s3.access.key", "****");
>>         configuration.setString("s3.secret.key", "****");
>>
>>         // If I don't do this, the S3 filesystem never gets the
>> credentials
>>         FileSystem.initialize(configuration, null);
>>
>>         LocalStreamEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>
>>         StreamingFileSink<String> s3 = StreamingFileSink
>>                 .forRowFormat(new Path("s3://bucket/"), new
>> SimpleStringEncoder<String>())
>>                 .build();
>>
>>         env.fromElements("string1", "string2")
>>             .addSink(s3);
>>
>>         env.execute();
>>
>>         System.out.println("Done");
>>     }
>> }
>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Re: S3 StreamingFileSink issues

Posted by David Anderson <da...@alpinegizmo.com>.
Dan,

The first point you've raised is a known issue: When a job is stopped, the
unfinished part files are not transitioned to the finished state. This is
mentioned in the docs as Important Note 2 [1], and fixing this is waiting
on FLIP-46 [2]. That section of the docs also includes some S3-specific
warnings, but nothing pertaining to managing credentials. Perhaps [3] will
help.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials


On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <da...@netzooid.com> wrote:

> First, let me say, Flink is super cool - thanks everyone for making my
> life easier in a lot of ways! Wish I had this 10 years ago....
>
> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
> Note that Flink is embedded in my app, not running as a standalone cluster.
>
> I am having a few problems, which I have illustrated in the small test
> case below.
>
> 1) After my job finishes, data never gets committed to S3. Looking through
> the code, I've noticed that data gets flushed to disk, but the multi-part
> upload is never finished. Even though my data doesn't hit the min part
> size, I would expect that if my job ends, my data should get uploaded since
> the job is 100% done.
>
> I am also having problems when the job is running not uploading - but I
> haven't been able to distill that down to a simple test case, so I thought
> I'd start here.
>
> 2) The S3 Filesystem does not pull credentials from the Flink
> Configuration when running in embedded mode. I have a workaround for this,
> but it is ugly. If you comment out the line in the test case which talks
> about this workaround, you will end up with a "Java.net.SocketException:
> Host is down"
>
> Can anyone shed light on these two issues? Thanks!
>
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import org.junit.jupiter.api.Test;
>
> public class S3Test {
>     @Test
>     public void whyDoesntThisWork() throws Exception {
>         Configuration configuration = new Configuration();
>         configuration.setString("state.backend",
> MemoryStateBackendFactory.class.getName());
>         configuration.setString("s3.access.key", "****");
>         configuration.setString("s3.secret.key", "****");
>
>         // If I don't do this, the S3 filesystem never gets the
> credentials
>         FileSystem.initialize(configuration, null);
>
>         LocalStreamEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>
>         StreamingFileSink<String> s3 = StreamingFileSink
>                 .forRowFormat(new Path("s3://bucket/"), new
> SimpleStringEncoder<String>())
>                 .build();
>
>         env.fromElements("string1", "string2")
>             .addSink(s3);
>
>         env.execute();
>
>         System.out.println("Done");
>     }
> }
>
>
> --
> Dan Diephouse
> @dandiep
>