You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Borisa Zivkovic <bo...@gmail.com> on 2017/06/14 13:16:10 UTC

TextIO - writing to directory, possible problem

Hi everyone,

lost some time figuring out why TextIO is not writing data for me.

I have this:

stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
        .apply("WritingToOutput",
TextIO.write().withWindowedWrites().withFilenamePolicy(new
WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));

Where WindowedFilenamePolicy is the one found
in org.apache.beam.sdk.io.AvroIOTest

I expected output files to be written to /tmp/ directory but they are not.
They are written to root directory ... Is this expected behaviour?

I did some analysis and here are few findings:

I think the problem
is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory ...

The following small main method shows the problem:

import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;

public class Test {

  public static void main(String[] args) {
    ResourceId res = FileBasedSink.convertToFileResourceIfPossible("/tmp/");
    System.out.println("Resource is " + res + ", current directory is " +
res.getCurrentDirectory() + ", filename is " + res.getFilename());
    FileBasedSink<String> mockFBS = new
FileBasedSink<String>(StaticValueProvider.of(res), null) {

      @Override
      public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
createWriteOperation() {
        return null;
      }
    };

    final ValueProvider<ResourceId> provider =
mockFBS.getBaseOutputDirectoryProvider();

    System.out.println("BaseOutputProvider is " + provider + ",
isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
  }

}

The output is

Resource is /tmp, current directory is //, filename is tmp
BaseOutputProvider is
NestedValueProvider{value=StaticValueProvider{value=/tmp}},
isAccessible=true, getValue=//

But I think this is wrong - or at least confusing.

Thoughts?

cheers
Borisa

Re: TextIO - writing to directory, possible problem

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
This looks wrong, filed https://issues.apache.org/jira/browse/BEAM-2448

On Wed, Jun 14, 2017 at 6:16 AM, Borisa Zivkovic <borisha.zivkovic@gmail.com
> wrote:

> Hi everyone,
>
> lost some time figuring out why TextIO is not writing data for me.
>
> I have this:
>
> stringsPCollection.apply(Window.<String>into(FixedWindows.of(Duration.
> standardSeconds(5))))
>         .apply("WritingToOutput",
> TextIO.write().withWindowedWrites().withFilenamePolicy(new
> WindowedFilenamePolicy("my_pref")).to("/tmp/").withNumShards(1));
>
> Where WindowedFilenamePolicy is the one found
> in org.apache.beam.sdk.io.AvroIOTest
>
> I expected output files to be written to /tmp/ directory but they are not.
> They are written to root directory ... Is this expected behaviour?
>
> I did some analysis and here are few findings:
>
> I think the problem
> is org.apache.beam.sdk.io.FileBasedSink.ExtractDirectory ...
>
> The following small main method shows the problem:
>
> import org.apache.beam.sdk.io.FileBasedSink;
> import org.apache.beam.sdk.io.fs.ResourceId;
> import org.apache.beam.sdk.options.ValueProvider;
> import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
>
> public class Test {
>
>   public static void main(String[] args) {
>     ResourceId res = FileBasedSink.convertToFileResourceIfPossibl
> e("/tmp/");
>     System.out.println("Resource is " + res + ", current directory is " +
> res.getCurrentDirectory() + ", filename is " + res.getFilename());
>     FileBasedSink<String> mockFBS = new
> FileBasedSink<String>(StaticValueProvider.of(res), null) {
>
>       @Override
>       public org.apache.beam.sdk.io.FileBasedSink.WriteOperation<String>
> createWriteOperation() {
>         return null;
>       }
>     };
>
>     final ValueProvider<ResourceId> provider =
> mockFBS.getBaseOutputDirectoryProvider();
>
>     System.out.println("BaseOutputProvider is " + provider + ",
> isAccessible=" + provider.isAccessible() + ", getValue=" + provider.get());
>   }
>
> }
>
> The output is
>
> Resource is /tmp, current directory is //, filename is tmp
> BaseOutputProvider is
> NestedValueProvider{value=StaticValueProvider{value=/tmp}},
> isAccessible=true, getValue=//
>
> But I think this is wrong - or at least confusing.
>
> Thoughts?
>
> cheers
> Borisa
>