You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jean-Baptiste Onofré <jb...@nanthrax.net> on 2017/05/04 15:58:27 UTC
[HEADS UP] Using "new" filesystem layer
Hi guys,
One of key refactoring/new feature we bring in the first stable release is the
"new" Beam filesystems.
I started to play with it on couple of use cases I have in beam-samples.
1/ TextIO.write() with unbounded PCollection (stream)
The first use case is the TextIO write with unbounded PCollection (good timing
as we had a question yesterday about this on Slack).
I confirm that TextIO now supports unbounded PCollection. You have to create a
Window and "flag" TextIO to use windowing.
Here's the code snippet:
pipeline
.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
.apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
public String apply(JmsRecord input) {
return input.getPayload();
}
}))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
.apply(TextIO.write()
.to("/home/jbonofre/demo/beam/output/uc2")
.withWindowedWrites()
.withNumShards(3));
Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the JMS
ack to advance the watermark, it should not be auto but client ack). I'm
preparing a PR for JmsIO about this.
However the "windowed" TextIO works fine.
2/ Beam HDFS filesystem
The other use case is to use the "new" Beam filesystem with TextIO, especially HDFS.
So, in my pipeline, I'm using:
.apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
In my pom.xml, I define both Beam hadoop-file-system and hadoop-client dependencies:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
Unfortunately, when starting the pipeline, I have:
Exception in thread "main" java.lang.IllegalStateException: Unable to find
registrar for hdfs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:427)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:494)
at
org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
I gonna investigate tonight and I will let you know.
Regards
JB
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: [HEADS UP] Using "new" filesystem layer
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thanks Luke,
Gonna try and I will let you know.
Regards
JB
On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
> JB, for your second point it seems as though you may not be setting the Hadoop
> configuration on HadoopFileSystemOptions.
> Also, I just merged https://github.com/apache/beam/pull/2890 which will auto
> detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR
> environment variables.
>
> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
> Hi guys,
>
> One of key refactoring/new feature we bring in the first stable release is
> the "new" Beam filesystems.
>
> I started to play with it on couple of use cases I have in beam-samples.
>
> 1/ TextIO.write() with unbounded PCollection (stream)
>
> The first use case is the TextIO write with unbounded PCollection (good
> timing as we had a question yesterday about this on Slack).
>
> I confirm that TextIO now supports unbounded PCollection. You have to create
> a Window and "flag" TextIO to use windowing.
>
> Here's the code snippet:
>
> pipeline
>
> .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
> .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
> public String apply(JmsRecord input) {
> return input.getPayload();
> }
> }))
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply(TextIO.write()
> .to("/home/jbonofre/demo/beam/output/uc2")
> .withWindowedWrites()
> .withNumShards(3));
>
> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
> JMS ack to advance the watermark, it should not be auto but client ack). I'm
> preparing a PR for JmsIO about this.
> However the "windowed" TextIO works fine.
>
> 2/ Beam HDFS filesystem
>
> The other use case is to use the "new" Beam filesystem with TextIO,
> especially HDFS.
>
> So, in my pipeline, I'm using:
>
>
> .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
> dependencies:
>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>0.7.0-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>2.7.3</version>
> </dependency>
>
> Unfortunately, when starting the pipeline, I have:
>
> Exception in thread "main" java.lang.IllegalStateException: Unable to find
> registrar for hdfs
> at org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
> at org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
> at org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
> at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
> at
> org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
> I gonna investigate tonight and I will let you know.
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: [HEADS UP] Using "new" filesystem layer
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Yes, I forgot to update the thread, it's what I'm doing and it works fine.
Thanks !
Regards
JB
On 05/05/2017 08:28 PM, Lukasz Cwik wrote:
> JB, the ConfigurationLocator is the default instance factory for the
> hdfsConfiguration so as long as HADOOP_CONF_DIR/YARN_CONF_DIR is correctly
> specified, you should only need to write:
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
>
> On Fri, May 5, 2017 at 6:23 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
> Hi guys,
>
> thanks Luke, I updated my pipeline like this:
>
> HadoopFileSystemOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
> HadoopFileSystemOptions.ConfigurationLocator locator = new
> HadoopFileSystemOptions.ConfigurationLocator();
> List<Configuration> configurations = locator.create(options);
> Pipeline pipeline = Pipeline.create(options);
> ...
> pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
>
> I defined HADOOP_CONF_DIR env variable pointing to the folder where I have
> hdfs-site.xml and it works fine.
>
> I saw that the README.md is not up to date in hadoop-file-system, I'm
> preparing a PR about that and I also preparing a quick documentation about
> HDFS support.
>
> Regards
> JB
>
> On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
>
> JB, for your second point it seems as though you may not be setting the
> Hadoop
> configuration on HadoopFileSystemOptions.
> Also, I just merged https://github.com/apache/beam/pull/2890
> <https://github.com/apache/beam/pull/2890> which will auto
> detect Hadoop configuration based upon your HADOOP_CONF_DIR and
> YARN_CONF_DIR
> environment variables.
>
> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>
> <mailto:jb@nanthrax.net <ma...@nanthrax.net>>> wrote:
>
> Hi guys,
>
> One of key refactoring/new feature we bring in the first stable
> release is
> the "new" Beam filesystems.
>
> I started to play with it on couple of use cases I have in beam-samples.
>
> 1/ TextIO.write() with unbounded PCollection (stream)
>
> The first use case is the TextIO write with unbounded PCollection (good
> timing as we had a question yesterday about this on Slack).
>
> I confirm that TextIO now supports unbounded PCollection. You have
> to create
> a Window and "flag" TextIO to use windowing.
>
> Here's the code snippet:
>
> pipeline
>
>
> .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
> .apply(MapElements.via(new SimpleFunction<JmsRecord,
> String>() {
> public String apply(JmsRecord input) {
> return input.getPayload();
> }
> }))
>
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply(TextIO.write()
> .to("/home/jbonofre/demo/beam/output/uc2")
> .withWindowedWrites()
> .withNumShards(3));
>
> Thanks to Dan, I found an issue in the watermark of JmsIO (as it
> uses the
> JMS ack to advance the watermark, it should not be auto but client
> ack). I'm
> preparing a PR for JmsIO about this.
> However the "windowed" TextIO works fine.
>
> 2/ Beam HDFS filesystem
>
> The other use case is to use the "new" Beam filesystem with TextIO,
> especially HDFS.
>
> So, in my pipeline, I'm using:
>
>
>
> .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
> dependencies:
>
> <dependency>
> <groupId>org.apache.beam</groupId>
>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>0.7.0-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>2.7.3</version>
> </dependency>
>
> Unfortunately, when starting the pipeline, I have:
>
> Exception in thread "main" java.lang.IllegalStateException: Unable
> to find
> registrar for hdfs
> at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>
>
> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
> at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>
>
> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
> at org.apache.beam.sdk.io <http://org.apache.beam.sdk.io>
>
> <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
> at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
> at
>
> org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
> I gonna investigate tonight and I will let you know.
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> <mailto:jbonofre@apache.org <ma...@apache.org>>
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: [HEADS UP] Using "new" filesystem layer
Posted by Lukasz Cwik <lc...@google.com>.
JB, the ConfigurationLocator is the default instance factory for the
hdfsConfiguration so as long as HADOOP_CONF_DIR/YARN_CONF_DIR is correctly
specified, you should only need to write:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
On Fri, May 5, 2017 at 6:23 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:
> Hi guys,
>
> thanks Luke, I updated my pipeline like this:
>
> HadoopFileSystemOptions options = PipelineOptionsFactory.fromArg
> s(args).withValidation().as(HadoopFileSystemOptions.class);
> HadoopFileSystemOptions.ConfigurationLocator locator = new
> HadoopFileSystemOptions.ConfigurationLocator();
> List<Configuration> configurations = locator.create(options);
> Pipeline pipeline = Pipeline.create(options);
> ...
> pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
>
> I defined HADOOP_CONF_DIR env variable pointing to the folder where I have
> hdfs-site.xml and it works fine.
>
> I saw that the README.md is not up to date in hadoop-file-system, I'm
> preparing a PR about that and I also preparing a quick documentation about
> HDFS support.
>
> Regards
> JB
>
> On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
>
>> JB, for your second point it seems as though you may not be setting the
>> Hadoop
>> configuration on HadoopFileSystemOptions.
>> Also, I just merged https://github.com/apache/beam/pull/2890 which will
>> auto
>> detect Hadoop configuration based upon your HADOOP_CONF_DIR and
>> YARN_CONF_DIR
>> environment variables.
>>
>> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
>> <ma...@nanthrax.net>> wrote:
>>
>> Hi guys,
>>
>> One of key refactoring/new feature we bring in the first stable
>> release is
>> the "new" Beam filesystems.
>>
>> I started to play with it on couple of use cases I have in
>> beam-samples.
>>
>> 1/ TextIO.write() with unbounded PCollection (stream)
>>
>> The first use case is the TextIO write with unbounded PCollection
>> (good
>> timing as we had a question yesterday about this on Slack).
>>
>> I confirm that TextIO now supports unbounded PCollection. You have to
>> create
>> a Window and "flag" TextIO to use windowing.
>>
>> Here's the code snippet:
>>
>> pipeline
>>
>> .apply(JmsIO.read().withConnectionFactory(connectionFactory)
>> .withQueue("BEAM"))
>> .apply(MapElements.via(new SimpleFunction<JmsRecord,
>> String>() {
>> public String apply(JmsRecord input) {
>> return input.getPayload();
>> }
>> }))
>>
>> .apply(Window.<String>into(FixedWindows.of(Duration.standard
>> Seconds(10))))
>> .apply(TextIO.write()
>> .to("/home/jbonofre/demo/beam/output/uc2")
>> .withWindowedWrites()
>> .withNumShards(3));
>>
>> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses
>> the
>> JMS ack to advance the watermark, it should not be auto but client
>> ack). I'm
>> preparing a PR for JmsIO about this.
>> However the "windowed" TextIO works fine.
>>
>> 2/ Beam HDFS filesystem
>>
>> The other use case is to use the "new" Beam filesystem with TextIO,
>> especially HDFS.
>>
>> So, in my pipeline, I'm using:
>>
>>
>> .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/
>> demo/beam/output/uc1"));
>>
>> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
>> dependencies:
>>
>> <dependency>
>> <groupId>org.apache.beam</groupId>
>> <artifactId>beam-sdks-java-io-
>> hadoop-file-system</artifactId>
>> <version>0.7.0-SNAPSHOT</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.hadoop</groupId>
>> <artifactId>hadoop-client</artifactId>
>> <version>2.7.3</version>
>> </dependency>
>>
>> Unfortunately, when starting the pipeline, I have:
>>
>> Exception in thread "main" java.lang.IllegalStateException: Unable
>> to find
>> registrar for hdfs
>> at org.apache.beam.sdk.io
>> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInt
>> ernal(FileSystems.java:427)
>> at org.apache.beam.sdk.io
>> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource
>> (FileSystems.java:494)
>> at org.apache.beam.sdk.io
>> <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileR
>> esourceIfPossible(FileBasedSink.java:193)
>> at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
>> at
>> org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT
>> oHdfs.java:39)
>>
>> I gonna investigate tonight and I will let you know.
>>
>> Regards
>> JB
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org <ma...@apache.org>
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
Re: [HEADS UP] Using "new" filesystem layer
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi guys,
thanks Luke, I updated my pipeline like this:
HadoopFileSystemOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
HadoopFileSystemOptions.ConfigurationLocator locator = new
HadoopFileSystemOptions.ConfigurationLocator();
List<Configuration> configurations = locator.create(options);
Pipeline pipeline = Pipeline.create(options);
...
pipeline.apply(TextIO.write().to("hdfs://localhost/path"));
I defined HADOOP_CONF_DIR env variable pointing to the folder where I have
hdfs-site.xml and it works fine.
I saw that the README.md is not up to date in hadoop-file-system, I'm preparing
a PR about that and I also preparing a quick documentation about HDFS support.
Regards
JB
On 05/04/2017 06:07 PM, Lukasz Cwik wrote:
> JB, for your second point it seems as though you may not be setting the Hadoop
> configuration on HadoopFileSystemOptions.
> Also, I just merged https://github.com/apache/beam/pull/2890 which will auto
> detect Hadoop configuration based upon your HADOOP_CONF_DIR and YARN_CONF_DIR
> environment variables.
>
> On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> <ma...@nanthrax.net>> wrote:
>
> Hi guys,
>
> One of key refactoring/new feature we bring in the first stable release is
> the "new" Beam filesystems.
>
> I started to play with it on couple of use cases I have in beam-samples.
>
> 1/ TextIO.write() with unbounded PCollection (stream)
>
> The first use case is the TextIO write with unbounded PCollection (good
> timing as we had a question yesterday about this on Slack).
>
> I confirm that TextIO now supports unbounded PCollection. You have to create
> a Window and "flag" TextIO to use windowing.
>
> Here's the code snippet:
>
> pipeline
>
> .apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue("BEAM"))
> .apply(MapElements.via(new SimpleFunction<JmsRecord, String>() {
> public String apply(JmsRecord input) {
> return input.getPayload();
> }
> }))
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply(TextIO.write()
> .to("/home/jbonofre/demo/beam/output/uc2")
> .withWindowedWrites()
> .withNumShards(3));
>
> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
> JMS ack to advance the watermark, it should not be auto but client ack). I'm
> preparing a PR for JmsIO about this.
> However the "windowed" TextIO works fine.
>
> 2/ Beam HDFS filesystem
>
> The other use case is to use the "new" Beam filesystem with TextIO,
> especially HDFS.
>
> So, in my pipeline, I'm using:
>
>
> .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/demo/beam/output/uc1"));
>
> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
> dependencies:
>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>0.7.0-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>2.7.3</version>
> </dependency>
>
> Unfortunately, when starting the pipeline, I have:
>
> Exception in thread "main" java.lang.IllegalStateException: Unable to find
> registrar for hdfs
> at org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:427)
> at org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:494)
> at org.apache.beam.sdk.io
> <http://org.apache.beam.sdk.io>.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:193)
> at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
> at
> org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcToHdfs.java:39)
>
> I gonna investigate tonight and I will let you know.
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org <ma...@apache.org>
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Re: [HEADS UP] Using "new" filesystem layer
Posted by Lukasz Cwik <lc...@google.com>.
JB, for your second point it seems as though you may not be setting the
Hadoop configuration on HadoopFileSystemOptions.
Also, I just merged https://github.com/apache/beam/pull/2890 which will
auto detect Hadoop configuration based upon your HADOOP_CONF_DIR and
YARN_CONF_DIR environment variables.
On Thu, May 4, 2017 at 8:58 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:
> Hi guys,
>
> One of key refactoring/new feature we bring in the first stable release is
> the "new" Beam filesystems.
>
> I started to play with it on couple of use cases I have in beam-samples.
>
> 1/ TextIO.write() with unbounded PCollection (stream)
>
> The first use case is the TextIO write with unbounded PCollection (good
> timing as we had a question yesterday about this on Slack).
>
> I confirm that TextIO now supports unbounded PCollection. You have to
> create a Window and "flag" TextIO to use windowing.
>
> Here's the code snippet:
>
> pipeline
>
> .apply(JmsIO.read().withConnectionFactory(connectionFactory)
> .withQueue("BEAM"))
> .apply(MapElements.via(new SimpleFunction<JmsRecord,
> String>() {
> public String apply(JmsRecord input) {
> return input.getPayload();
> }
> }))
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
> .apply(TextIO.write()
> .to("/home/jbonofre/demo/beam/output/uc2")
> .withWindowedWrites()
> .withNumShards(3));
>
> Thanks to Dan, I found an issue in the watermark of JmsIO (as it uses the
> JMS ack to advance the watermark, it should not be auto but client ack).
> I'm preparing a PR for JmsIO about this.
> However the "windowed" TextIO works fine.
>
> 2/ Beam HDFS filesystem
>
> The other use case is to use the "new" Beam filesystem with TextIO,
> especially HDFS.
>
> So, in my pipeline, I'm using:
>
> .apply(TextIO.write().to("hdfs://localhost/home/jbonofre/
> demo/beam/output/uc1"));
>
> In my pom.xml, I define both Beam hadoop-file-system and hadoop-client
> dependencies:
>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
> <version>0.7.0-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>2.7.3</version>
> </dependency>
>
> Unfortunately, when starting the pipeline, I have:
>
> Exception in thread "main" java.lang.IllegalStateException: Unable to
> find registrar for hdfs
> at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(Fil
> eSystems.java:427)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSyst
> ems.java:494)
> at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIf
> Possible(FileBasedSink.java:193)
> at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:292)
> at org.apache.beam.samples.data.ingestion.JdbcToHdfs.main(JdbcT
> oHdfs.java:39)
>
> I gonna investigate tonight and I will let you know.
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>