You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jean-Baptiste Onofré <jb...@nanthrax.net> on 2017/05/04 15:58:27 UTC

[HEADS UP] Using "new" filesystem layer

Hi guys,

One of key refactoring/new feature we bring in the first stable release is the 
"new" Beam filesystems.

I started to play with it on couple of use cases I have in beam-samples.

1/ TextIO.write() with unbounded PCollection (stream)

The first use case is the TextIO write with unbounded PCollection (good timing 
as we had a question yesterday about this on Slack).

I confirm that TextIO now supports unbounded PCollection. You have to create a 
Window and "flag" TextIO to use windowing.

Here's the code snippet:

pipeline
 
.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
                 .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
                     public String apply(JmsRecord input) {
                         return input.getPayload();
                     }
                 }))
 
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
         	.apply(TextIO.write()
                         .to("/home/jbonofre/demo/beam/output/uc2")
                         .withWindowedWrites()
                         .withNumShards(3));

Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the JMS 
ack to advance the watermark, it should not be auto but client ack). I'm 
preparing a PR for JmsIO about this.
However the "windowed" TextIO works fine.

2/ Beam HDFS filesystem

The other use case is to use the "new" Beam filesystem with TextIO, especially HDFS.

So, in my pipeline, I'm using:

	.apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));

In my pom.xml, I define both Beam hadoop-file-system and hadoop-client dependencies:

         <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
             <version>0.7.0-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
             <version>2.7.3</version>
         </dependency>

Unfortunately, when starting the pipeline, I have:

Exception in thread "main" java.lang.IllegalStateException: Unable to find 
registrar for hdfs
	at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:427)
	at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:494)
	at 
org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
	at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
	at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)

I gonna investigate tonight and I will let you know.

Regards
JB
-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [HEADS UP] Using "new" filesystem layer

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thanks Luke,

Gonna try and I will let you know.

Regards
JB

On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
> JB, for your second point it seems as though you may not be setting the Hadoop
> configuration on HadoopFileSystemOptions.
> Also, I just merged https://github.com/apache/beam/pull/2890 which will auto
> detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR
> environment variables.
>
> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
>     Hi guys,
>
>     One of key refactoring/new feature we bring in the first stable release is
>     the "new" Beam filesystems.
>
>     I started to play with it on couple of use cases I have in beam-samples.
>
>     1/ TextIO.write() with unbounded PCollection (stream)
>
>     The first use case is the TextIO write with unbounded PCollection (good
>     timing as we had a question yesterday about this on Slack).
>
>     I confirm that TextIO now supports unbounded PCollection. You have to create
>     a Window and "flag" TextIO to use windowing.
>
>     Here's the code snippet:
>
>     pipeline
>
>     .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
>                     .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
>                         public String apply(JmsRecord input) {
>                             return input.getPayload();
>                         }
>                     }))
>
>     .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
>                     .apply(TextIO.write()
>                             .to("/home/jbonofre/demo/beam/output/uc2")
>                             .withWindowedWrites()
>                             .withNumShards(3));
>
>     Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
>     JMS ack to advance the watermark, it should not be auto but client ack). I'm
>     preparing a PR for JmsIO about this.
>     However the "windowed" TextIO works fine.
>
>     2/ Beam HDFS filesystem
>
>     The other use case is to use the "new" Beam filesystem with TextIO,
>     especially HDFS.
>
>     So, in my pipeline, I'm using:
>
>
>     .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
>     In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
>     dependencies:
>
>             <dependency>
>                 <groupId>org.apache.beam</groupId>
>                 <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
>                 <version>0.7.0-SNAPSHOT</version>
>             </dependency>
>             <dependency>
>                 <groupId>org.apache.hadoop</groupId>
>                 <artifactId>hadoop-client</artifactId>
>                 <version>2.7.3</version>
>             </dependency>
>
>     Unfortunately, when starting the pipeline, I have:
>
>     Exception in thread "main" java.lang.IllegalStateException: Unable to find
>     registrar for hdfs
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
>             at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>             at
>     org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
>     I gonna investigate tonight and I will let you know.
>
>     Regards
>     JB
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org <ma...@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [HEADS UP] Using "new" filesystem layer

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Yes, I forgot to update the thread, it's what I'm doing and it works fine.

Thanks !
Regards
JB

On 05/05/2017 08:28 PM, Lukasz Cwik wrote:
> JB, the ConfigurationLocator is the default instance factory for the
> hdfsConfiguration so as long as HADOOP_CONF_DIR/YARN_CONF_DIR is correctly
> specified, you should only need to write:
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
>
> On Fri, May 5, 2017 at 6:23 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
>     Hi guys,
>
>     thanks Luke, I updated my pipeline like this:
>
>             HadoopFileSystemOptions options =
>     PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
>             HadoopFileSystemOptions.ConfigurationLocator locator = new
>     HadoopFileSystemOptions.ConfigurationLocator();
>             List<Configuration> configurations = locator.create(options);
>             Pipeline pipeline = Pipeline.create(options);
>             ...
>             pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
>
>     I defined HADOOP_CONF_DIR env variable pointing to the folder where I have
>     hdfs-site.xml and it works fine.
>
>     I saw that the README.md is not up to date in hadoop-file-system, I'm
>     preparing a PR about that and I also preparing a quick documentation about
>     HDFS support.
>
>     Regards
>     JB
>
>     On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
>
>         JB, for your second point it seems as though you may not be setting the
>         Hadoop
>         configuration on HadoopFileSystemOptions.
>         Also, I just merged https://github.com/apache/beam/pull/2890
>         <https://github.com/apache/beam/pull/2890> which will auto
>         detect Hadoop configuration based upon your HADOOP_CONF_DIR and
>         YARN_CONF_DIR
>         environment variables.
>
>         On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
>         <ma...@nanthrax.net>
>         <mailto:jb@nanthrax.net <ma...@nanthrax.net>>> wrote:
>
>             Hi guys,
>
>             One of key refactoring/new feature we bring in the first stable
>         release is
>             the "new" Beam filesystems.
>
>             I started to play with it on couple of use cases I have in beam-samples.
>
>             1/ TextIO.write() with unbounded PCollection (stream)
>
>             The first use case is the TextIO write with unbounded PCollection (good
>             timing as we had a question yesterday about this on Slack).
>
>             I confirm that TextIO now supports unbounded PCollection. You have
>         to create
>             a Window and "flag" TextIO to use windowing.
>
>             Here's the code snippet:
>
>             pipeline
>
>
>         .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
>                             .apply(MapElements.via(new SimpleFunction<JmsRecord,
>         String>() {
>                                 public String apply(JmsRecord input) {
>                                     return input.getPayload();
>                                 }
>                             }))
>
>
>         .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
>                             .apply(TextIO.write()
>                                     .to("/home/jbonofre/demo/beam/output/uc2")
>                                     .withWindowedWrites()
>                                     .withNumShards(3));
>
>             Thanks to Dan, I found an issue in the watermark of JmsIO (as it
>         uses the
>             JMS ack to advance the watermark, it should not be auto but client
>         ack). I'm
>             preparing a PR for JmsIO about this.
>             However the "windowed" TextIO works fine.
>
>             2/ Beam HDFS filesystem
>
>             The other use case is to use the "new" Beam filesystem with TextIO,
>             especially HDFS.
>
>             So, in my pipeline, I'm using:
>
>
>
>         .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
>             In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
>             dependencies:
>
>                     <dependency>
>                         <groupId>org.apache.beam</groupId>
>
>         <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
>                         <version>0.7.0-SNAPSHOT</version>
>                     </dependency>
>                     <dependency>
>                         <groupId>org.apache.hadoop</groupId>
>                         <artifactId>hadoop-client</artifactId>
>                         <version>2.7.3</version>
>                     </dependency>
>
>             Unfortunately, when starting the pipeline, I have:
>
>             Exception in thread "main" java.lang.IllegalStateException: Unable
>         to find
>             registrar for hdfs
>                     at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>
>
>         <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
>                     at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>
>
>         <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
>                     at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>
>
>         <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
>                     at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>                     at
>
>         org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
>             I gonna investigate tonight and I will let you know.
>
>             Regards
>             JB
>             --
>             Jean-Baptiste Onofré
>             jbonofre@apache.org <ma...@apache.org>
>         <mailto:jbonofre@apache.org <ma...@apache.org>>
>             http://blog.nanthrax.net
>             Talend - http://www.talend.com
>
>
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org <ma...@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [HEADS UP] Using "new" filesystem layer

Posted by Lukasz Cwik <lc...@google.com>.
JB, the ConfigurationLocator is the default instance factory for the
hdfsConfiguration so as long as HADOOP_CONF_DIR/YARN_CONF_DIR is correctly
specified, you should only need to write:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.write().to("hdfs://localhost/path"));

On Fri, May 5, 2017 at 6:23 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi guys,
>
> thanks Luke, I updated my pipeline like this:
>
>         HadoopFileSystemOptions options = PipelineOptionsFactory.fromArg
> s(args).withValidation().as(HadoopFileSystemOptions.class);
>         HadoopFileSystemOptions.ConfigurationLocator locator = new
> HadoopFileSystemOptions.ConfigurationLocator();
>         List<Configuration> configurations = locator.create(options);
>         Pipeline pipeline = Pipeline.create(options);
>         ...
>         pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
>
> I defined HADOOP_CONF_DIR env variable pointing to the folder where I have
> hdfs-site.xml and it works fine.
>
> I saw that the README.md is not up to date in hadoop-file-system, I'm
> preparing a PR about that and I also preparing a quick documentation about
> HDFS support.
>
> Regards
> JB
>
> On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
>
>> JB, for your second point it seems as though you may not be setting the
>> Hadoop
>> configuration on HadoopFileSystemOptions.
>> Also, I just merged https://github.com/apache/beam/pull/2890 which will
>> auto
>> detect Hadoop configuration based upon your HADOOP_CONF_DIR and
>> YARN_CONF_DIR
>> environment variables.
>>
>> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
>> <ma...@nanthrax.net>> wrote:
>>
>>     Hi guys,
>>
>>     One of key refactoring/new feature we bring in the first stable
>> release is
>>     the "new" Beam filesystems.
>>
>>     I started to play with it on couple of use cases I have in
>> beam-samples.
>>
>>     1/ TextIO.write() with unbounded PCollection (stream)
>>
>>     The first use case is the TextIO write with unbounded PCollection
>> (good
>>     timing as we had a question yesterday about this on Slack).
>>
>>     I confirm that TextIO now supports unbounded PCollection. You have to
>> create
>>     a Window and "flag" TextIO to use windowing.
>>
>>     Here's the code snippet:
>>
>>     pipeline
>>
>>     .apply(JmsIO.read().withConnectionFactory(connectionFactory)
>> .withQueue("BEAM"))
>>                     .apply(MapElements.via(new SimpleFunction<JmsRecord,
>> String>() {
>>                         public String apply(JmsRecord input) {
>>                             return input.getPayload();
>>                         }
>>                     }))
>>
>>     .apply(Window.<String>into(FixedWindows.of(Duration.standard
>> Seconds(10))))
>>                     .apply(TextIO.write()
>>                             .to("/home/jbonofre/demo/beam/output/uc2")
>>                             .withWindowedWrites()
>>                             .withNumShards(3));
>>
>>     Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses
>> the
>>     JMS ack to advance the watermark, it should not be auto but client
>> ack). I'm
>>     preparing a PR for JmsIO about this.
>>     However the "windowed" TextIO works fine.
>>
>>     2/ Beam HDFS filesystem
>>
>>     The other use case is to use the "new" Beam filesystem with TextIO,
>>     especially HDFS.
>>
>>     So, in my pipeline, I'm using:
>>
>>
>>     .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/
>> demo/beam/output/uc1"));
>>
>>     In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
>>     dependencies:
>>
>>             <dependency>
>>                 <groupId>org.apache.beam</groupId>
>>                 <artifactId>beam-sdks-java-io-
>> hadoop-file-system</artifactId>
>>                 <version>0.7.0-SNAPSHOT</version>
>>             </dependency>
>>             <dependency>
>>                 <groupId>org.apache.hadoop</groupId>
>>                 <artifactId>hadoop-client</artifactId>
>>                 <version>2.7.3</version>
>>             </dependency>
>>
>>     Unfortunately, when starting the pipeline, I have:
>>
>>     Exception in thread "main" java.lang.IllegalStateException: Unable
>> to find
>>     registrar for hdfs
>>             at org.apache.beam.sdk.io
>>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInt
>> ernal(FileSystems.java:427)
>>             at org.apache.beam.sdk.io
>>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource
>> (FileSystems.java:494)
>>             at org.apache.beam.sdk.io
>>     <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileR
>> esourceIfPossible(FileBasedSink.java:193)
>>             at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>>             at
>>     org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT
>> oHdfs.java:39)
>>
>>     I gonna investigate tonight and I will let you know.
>>
>>     Regards
>>     JB
>>     --
>>     Jean-Baptiste Onofré
>>     jbonofre@apache.org <ma...@apache.org>
>>     http://blog.nanthrax.net
>>     Talend - http://www.talend.com
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: [HEADS UP] Using "new" filesystem layer

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi guys,

thanks Luke, I updated my pipeline like this:

         HadoopFileSystemOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
         HadoopFileSystemOptions.ConfigurationLocator locator = new 
HadoopFileSystemOptions.ConfigurationLocator();
         List<Configuration> configurations = locator.create(options);
	Pipeline pipeline = Pipeline.create(options);
	...
	pipeline.apply(TextIO.write().to("hdfs://localhost/path"));

I defined HADOOP_CONF_DIR env variable pointing to the folder where I have 
hdfs-site.xml and it works fine.

I saw that the README.md is not up to date in hadoop-file-system, I'm preparing 
a PR about that and I also preparing a quick documentation about HDFS support.

Regards
JB

On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
> JB, for your second point it seems as though you may not be setting the Hadoop
> configuration on HadoopFileSystemOptions.
> Also, I just merged https://github.com/apache/beam/pull/2890 which will auto
> detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR
> environment variables.
>
> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
>     Hi guys,
>
>     One of key refactoring/new feature we bring in the first stable release is
>     the "new" Beam filesystems.
>
>     I started to play with it on couple of use cases I have in beam-samples.
>
>     1/ TextIO.write() with unbounded PCollection (stream)
>
>     The first use case is the TextIO write with unbounded PCollection (good
>     timing as we had a question yesterday about this on Slack).
>
>     I confirm that TextIO now supports unbounded PCollection. You have to create
>     a Window and "flag" TextIO to use windowing.
>
>     Here's the code snippet:
>
>     pipeline
>
>     .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
>                     .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
>                         public String apply(JmsRecord input) {
>                             return input.getPayload();
>                         }
>                     }))
>
>     .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
>                     .apply(TextIO.write()
>                             .to("/home/jbonofre/demo/beam/output/uc2")
>                             .withWindowedWrites()
>                             .withNumShards(3));
>
>     Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
>     JMS ack to advance the watermark, it should not be auto but client ack). I'm
>     preparing a PR for JmsIO about this.
>     However the "windowed" TextIO works fine.
>
>     2/ Beam HDFS filesystem
>
>     The other use case is to use the "new" Beam filesystem with TextIO,
>     especially HDFS.
>
>     So, in my pipeline, I'm using:
>
>
>     .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
>     In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
>     dependencies:
>
>             <dependency>
>                 <groupId>org.apache.beam</groupId>
>                 <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
>                 <version>0.7.0-SNAPSHOT</version>
>             </dependency>
>             <dependency>
>                 <groupId>org.apache.hadoop</groupId>
>                 <artifactId>hadoop-client</artifactId>
>                 <version>2.7.3</version>
>             </dependency>
>
>     Unfortunately, when starting the pipeline, I have:
>
>     Exception in thread "main" java.lang.IllegalStateException: Unable to find
>     registrar for hdfs
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
>             at org.apache.beam.sdk.io
>     <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
>             at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>             at
>     org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
>     I gonna investigate tonight and I will let you know.
>
>     Regards
>     JB
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org <ma...@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [HEADS UP] Using "new" filesystem layer

Posted by Lukasz Cwik <lc...@google.com>.
JB, for your second point it seems as though you may not be setting the
Hadoop configuration on HadoopFileSystemOptions.
Also, I just merged https://github.com/apache/beam/pull/2890 which will
auto detect Hadoop configuration based upon your HADOOP_CONF_DIR and
YARN_CONF_DIR environment variables.

On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi guys,
>
> One of key refactoring/new feature we bring in the first stable release is
> the "new" Beam filesystems.
>
> I started to play with it on couple of use cases I have in beam-samples.
>
> 1/ TextIO.write() with unbounded PCollection (stream)
>
> The first use case is the TextIO write with unbounded PCollection (good
> timing as we had a question yesterday about this on Slack).
>
> I confirm that TextIO now supports unbounded PCollection. You have to
> create a Window and "flag" TextIO to use windowing.
>
> Here's the code snippet:
>
> pipeline
>
> .apply(JmsIO.read().withConnectionFactory(connectionFactory)
> .withQueue("BEAM"))
>                 .apply(MapElements.via(new SimpleFunction<JmsRecord,
> String>() {
>                     public String apply(JmsRecord input) {
>                         return input.getPayload();
>                     }
>                 }))
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
>                 .apply(TextIO.write()
>                         .to("/home/jbonofre/demo/beam/output/uc2")
>                         .withWindowedWrites()
>                         .withNumShards(3));
>
> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
> JMS ack to advance the watermark, it should not be auto but client ack).
> I'm preparing a PR for JmsIO about this.
> However the "windowed" TextIO works fine.
>
> 2/ Beam HDFS filesystem
>
> The other use case is to use the "new" Beam filesystem with TextIO,
> especially HDFS.
>
> So, in my pipeline, I'm using:
>
>         .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/
> demo/beam/output/uc1"));
>
> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
> dependencies:
>
>         <dependency>
>             <groupId>org.apache.beam</groupId>
>             <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
>             <version>0.7.0-SNAPSHOT</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-client</artifactId>
>             <version>2.7.3</version>
>         </dependency>
>
> Unfortunately, when starting the pipeline, I have:
>
> Exception in thread "main" java.lang.IllegalStateException: Unable to
> find registrar for hdfs
>         at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(Fil
> eSystems.java:427)
>         at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSyst
> ems.java:494)
>         at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIf
> Possible(FileBasedSink.java:193)
>         at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>         at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT
> oHdfs.java:39)
>
> I gonna investigate tonight and I will let you know.
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>