You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by cw7k <cw...@yahoo.com.INVALID> on 2018/01/22 22:42:30 UTC

filesystem output partitioning

 Hi, I ran the WordCount batch program and noticed the output was split into 5 files.Is there documentation on how the splitting is done and how to tweak it?    On Friday, January 19, 2018, 12:06:45 AM PST, Fabian Hueske <fh...@gmail.com> wrote:  
 
 Great! Thanks for reporting back.

2018-01-19 1:43 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:

>  Ok, I have the factory working in the WordCount example.  I had to move
> the factory code and META-INF into the WordCount project.
> For general Flink jobs, I'm assuming that the goal would be to be able to
> import the factory from the job itself instead of needing to copy the
> factory .java file into each project?  If so, any guidelines on how to do
> that?    On Thursday, January 18, 2018, 10:53:32 AM PST, cw7k
> <cw...@yahoo.com.INVALID> wrote:
>
>  Hi, just a bit more info, I have a test function working using oci://,
> based on the S3 test:
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-
> hadoop/src/test/java/org/apache/flink/fs/s3hadoop/
> HadoopS3FileSystemITCase.java#L169
> However, when I try to get the WordCount example's WriteAsText to write to
> my new filesystem:
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-
> streaming/src/main/java/org/apache/flink/streaming/
> examples/wordcount/WordCount.java#L82
>
> that's where I got the "Could not find a file system implementation" error
> mentioned earlier.
>
>    On Thursday, January 18, 2018, 10:22:57 AM PST, cw7k
> <cw...@yahoo.com.INVALID> wrote:
>
>  Thanks.  I now have the 3 requirements fulfilled but the scheme isn't
> being loaded; I get this error:
> "Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'oci'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded."
> What's the best way to debug the loading of the schemes/filesystems by the
> ServiceLoader?    On Thursday, January 18, 2018, 5:09:10 AM PST, Fabian
> Hueske <fh...@gmail.com> wrote:
>
>  In fact, there are two S3FileSystemFactory classes, one for Hadoop and
> another one for Presto.
> In both cases an external file system class is wrapped in Flink's
> HadoopFileSystem class [1] [2].
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-
> hadoop/src/main/java/org/apache/flink/fs/s3hadoop/
> S3FileSystemFactory.java#L132
> [2]
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-
> presto/src/main/java/org/apache/flink/fs/s3presto/
> S3FileSystemFactory.java#L131
>
> 2018-01-18 1:24 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:
>
> >  Thanks. I'm looking at the s3 example and I can only find the
> > S3FileSystemFactory but not the File System implementation (subclass
> > of org.apache.flink.core.fs.FileSystem).
> > Is that requirement still needed?    On Wednesday, January 17, 2018,
> > 3:59:47 PM PST, Fabian Hueske <fh...@gmail.com> wrote:
> >
> >  Hi,
> >
> > please have a look at this doc page [1].
> > It describes how to add new file system implementations and also how to
> > configure them.
> >
> > Best, Fabian
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-
> > release-1.4/ops/filesystems.html#adding-new-file-system-implementations
> >
> > 2018-01-18 0:32 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:
> >
> > >  Hi, I'm adding support for more cloud storage providers such as Google
> > > (gcs://) and Oracle (oci://).
> > > I have an oci:// test working based on the s3a:// test but when I try
> it
> > > on an actual Flink job like WordCount, I get this message:
> > > "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could
> > not
> > > find a file system implementation for scheme 'oci'. The scheme is not
> > > directly supported by Flink and no Hadoop file system to support this
> > > scheme could be loaded."
> > > How do I register new schemes into the file system factory?  Thanks.
> > On
> > > Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k <cw7k@yahoo.com.INVALID
> >
> > > wrote:
> > >
> > >  Hi, question on this page:
> > > "You need to point Flink to a valid Hadoop configuration..."https://ci
> .
> > > apache.org/projects/flink/flink-docs-release-1.4/ops/
> > > deployment/aws.html#s3-simple-storage-service
> > > How do you point Flink to the Hadoop config?
> > >    On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann <
> > > trohrmann@apache.org> wrote:
> > >
> > >  Hi,
> > >
> > > the flink-connector-filesystem contains the BucketingSink which is a
> > > connector with which you can write your data to a file system. It
> > provides
> > > exactly once processing guarantees and allows to write data to
> different
> > > buckets [1].
> > >
> > > The flink-filesystem module contains different file system
> > implementations
> > > (like mapr fs, hdfs or s3). If you want to use, for example, s3 file
> > > system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto
> > module.
> > >
> > > So if you want to write your data to s3 using the BucketingSink, then
> you
> > > have to add flink-connector-filesystem for the BucketingSink as well
> as a
> > > s3 file system implementations (e.g. flink-s3-fs-hadoop or
> > > flink-s3-fs-presto).
> > >
> > > Usually, there should be no need to change Flink's filesystem
> > > implementations. If you want to add a new connector, then this would go
> > to
> > > flink-connectors or to Apache Bahir [2].
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/
> > > filesystem_sink.html
> > >
> > > [2]
> > > https://ci.apache.org/projects/flink/flink-docs-
> > > master/dev/connectors/index.html#connectors-in-apache-bahir
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Jan 12, 2018 at 7:22 PM, cw7k <cw...@yahoo.com.invalid> wrote:
> > >
> > > > Hi, I'm trying to understand the difference between the
> > flink-filesystem
> > > > and flink-connector-filesystem.  How is each intended to be used?
> > > > If adding support for a different storage provider that supports
> HDFS,
> > > > should additions be made to one or the other, or both?  Thanks.
> > >
> >
> >
>
>
  

Re: filesystem output partitioning

Posted by Fabian Hueske <fh...@gmail.com>.
In DataSet (batch) programs, FileOutputFormats write one output file for
each parallel operator instance.
If your operator runs with a parallelism of 8, the output is split across 8
files.

2018-01-22 23:42 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:

>  Hi, I ran the WordCount batch program and noticed the output was split
> into 5 files.Is there documentation on how the splitting is done and how to
> tweak it?    On Friday, January 19, 2018, 12:06:45 AM PST, Fabian Hueske <
> fhueske@gmail.com> wrote:
>
>  Great! Thanks for reporting back.
>
> 2018-01-19 1:43 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:
>
> >  Ok, I have the factory working in the WordCount example.  I had to move
> > the factory code and META-INF into the WordCount project.
> > For general Flink jobs, I'm assuming that the goal would be to be able to
> > import the factory from the job itself instead of needing to copy the
> > factory .java file into each project?  If so, any guidelines on how to do
> > that?    On Thursday, January 18, 2018, 10:53:32 AM PST, cw7k
> > <cw...@yahoo.com.INVALID> wrote:
> >
> >  Hi, just a bit more info, I have a test function working using oci://,
> > based on the S3 test:
> > https://github.com/apache/flink/blob/master/flink-
> filesystems/flink-s3-fs-
> > hadoop/src/test/java/org/apache/flink/fs/s3hadoop/
> > HadoopS3FileSystemITCase.java#L169
> > However, when I try to get the WordCount example's WriteAsText to write
> to
> > my new filesystem:
> > https://github.com/apache/flink/blob/master/flink-
> examples/flink-examples-
> > streaming/src/main/java/org/apache/flink/streaming/
> > examples/wordcount/WordCount.java#L82
> >
> > that's where I got the "Could not find a file system implementation"
> error
> > mentioned earlier.
> >
> >    On Thursday, January 18, 2018, 10:22:57 AM PST, cw7k
> > <cw...@yahoo.com.INVALID> wrote:
> >
> >  Thanks.  I now have the 3 requirements fulfilled but the scheme isn't
> > being loaded; I get this error:
> > "Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeExc
> eption:
> > Could not find a file system implementation for scheme 'oci'. The scheme
> is
> > not directly supported by Flink and no Hadoop file system to support this
> > scheme could be loaded."
> > What's the best way to debug the loading of the schemes/filesystems by
> the
> > ServiceLoader?    On Thursday, January 18, 2018, 5:09:10 AM PST, Fabian
> > Hueske <fh...@gmail.com> wrote:
> >
> >  In fact, there are two S3FileSystemFactory classes, one for Hadoop and
> > another one for Presto.
> > In both cases an external file system class is wrapped in Flink's
> > HadoopFileSystem class [1] [2].
> >
> > Best, Fabian
> >
> > [1]
> > https://github.com/apache/flink/blob/master/flink-
> filesystems/flink-s3-fs-
> > hadoop/src/main/java/org/apache/flink/fs/s3hadoop/
> > S3FileSystemFactory.java#L132
> > [2]
> > https://github.com/apache/flink/blob/master/flink-
> filesystems/flink-s3-fs-
> > presto/src/main/java/org/apache/flink/fs/s3presto/
> > S3FileSystemFactory.java#L131
> >
> > 2018-01-18 1:24 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:
> >
> > >  Thanks. I'm looking at the s3 example and I can only find the
> > > S3FileSystemFactory but not the File System implementation (subclass
> > > of org.apache.flink.core.fs.FileSystem).
> > > Is that requirement still needed?    On Wednesday, January 17, 2018,
> > > 3:59:47 PM PST, Fabian Hueske <fh...@gmail.com> wrote:
> > >
> > >  Hi,
> > >
> > > please have a look at this doc page [1].
> > > It describes how to add new file system implementations and also how to
> > > configure them.
> > >
> > > Best, Fabian
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-
> > > release-1.4/ops/filesystems.html#adding-new-file-system-
> implementations
> > >
> > > 2018-01-18 0:32 GMT+01:00 cw7k <cw...@yahoo.com.invalid>:
> > >
> > > >  Hi, I'm adding support for more cloud storage providers such as
> Google
> > > > (gcs://) and Oracle (oci://).
> > > > I have an oci:// test working based on the s3a:// test but when I try
> > it
> > > > on an actual Flink job like WordCount, I get this message:
> > > > "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could
> > > not
> > > > find a file system implementation for scheme 'oci'. The scheme is not
> > > > directly supported by Flink and no Hadoop file system to support this
> > > > scheme could be loaded."
> > > > How do I register new schemes into the file system factory?  Thanks.
> > > On
> > > > Tuesday, January 16, 2018, 5:27:31 PM PST, cw7k
> <cw7k@yahoo.com.INVALID
> > >
> > > > wrote:
> > > >
> > > >  Hi, question on this page:
> > > > "You need to point Flink to a valid Hadoop configuration..."
> https://ci
> > .
> > > > apache.org/projects/flink/flink-docs-release-1.4/ops/
> > > > deployment/aws.html#s3-simple-storage-service
> > > > How do you point Flink to the Hadoop config?
> > > >    On Saturday, January 13, 2018, 4:56:15 AM PST, Till Rohrmann <
> > > > trohrmann@apache.org> wrote:
> > > >
> > > >  Hi,
> > > >
> > > > the flink-connector-filesystem contains the BucketingSink which is a
> > > > connector with which you can write your data to a file system. It
> > > provides
> > > > exactly once processing guarantees and allows to write data to
> > different
> > > > buckets [1].
> > > >
> > > > The flink-filesystem module contains different file system
> > > implementations
> > > > (like mapr fs, hdfs or s3). If you want to use, for example, s3 file
> > > > system, then there is the flink-s3-fs-hadoop and flink-s3-fs-presto
> > > module.
> > > >
> > > > So if you want to write your data to s3 using the BucketingSink, then
> > you
> > > > have to add flink-connector-filesystem for the BucketingSink as well
> > as a
> > > > s3 file system implementations (e.g. flink-s3-fs-hadoop or
> > > > flink-s3-fs-presto).
> > > >
> > > > Usually, there should be no need to change Flink's filesystem
> > > > implementations. If you want to add a new connector, then this would
> go
> > > to
> > > > flink-connectors or to Apache Bahir [2].
> > > >
> > > > [1]
> > > > https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/
> > > > filesystem_sink.html
> > > >
> > > > [2]
> > > > https://ci.apache.org/projects/flink/flink-docs-
> > > > master/dev/connectors/index.html#connectors-in-apache-bahir
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Jan 12, 2018 at 7:22 PM, cw7k <cw...@yahoo.com.invalid>
> wrote:
> > > >
> > > > > Hi, I'm trying to understand the difference between the
> > > flink-filesystem
> > > > > and flink-connector-filesystem.  How is each intended to be used?
> > > > > If adding support for a different storage provider that supports
> > HDFS,
> > > > > should additions be made to one or the other, or both?  Thanks.
> > > >
> > >
> > >
> >
> >
>