You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Tanner Cecchetti (JIRA)" <ji...@apache.org> on 2018/07/12 19:41:00 UTC

[jira] [Comment Edited] (BEAM-4772) TextIO.read transform does not respect .withEmptyMatchTreatment

    [ https://issues.apache.org/jira/browse/BEAM-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542126#comment-16542126 ] 

Tanner Cecchetti edited comment on BEAM-4772 at 7/12/18 7:40 PM:
-----------------------------------------------------------------

It looks to me like the issue is with CompressedSource. I don't think it correctly inherits the value of its delegate's emptyMatchTreatment property.

To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new FileBasedSource<String>(StaticValueProvider.of("abc"), EmptyMatchTreatment.ALLOW, 0L) {
    protected FileBasedSource<String> createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long end) {return null;}
    protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {return null;}
};

// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);

// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it's might be incorrect to me.

 

It seems like the fix might be to change the super() calls within CompressedSource to include the delegate's emptyMatchTreatment. For instance,
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
would become
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would print ALLOW in both statements).

 

I'm not sure if this fixes the problem or not – I didn't recompile Beam with this change and test it. Just hoping this might be helpful. 


was (Author: tanner.c):
It looks to me like the issue is with CompressedSource. I don't think it correctly inherits the value of its delegate's emptyMatchTreatment property.

To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new FileBasedSource<String>(StaticValueProvider.of("abc"), EmptyMatchTreatment.ALLOW, 0L) {
    protected FileBasedSource<String> createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long end) {return null;}
    protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {return null;}
};

// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);

// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it's might be incorrect to me.

 

It seems like the fix might be to change the super() calls within CompressedSource to include the delegate's emptyMatchTreatment. For instance,

 
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
would become

 

 
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would print ALLOW in both statements).

 

I'm not sure if this fixes the problem or not – I didn't recompile Beam with this change and test it. Just hoping this might be helpful.

 

> TextIO.read transform does not respect .withEmptyMatchTreatment
> ---------------------------------------------------------------
>
>                 Key: BEAM-4772
>                 URL: https://issues.apache.org/jira/browse/BEAM-4772
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.5.0
>            Reporter: Samuel Waggoner
>            Assignee: Kenneth Knowles
>            Priority: Major
>
> I modified the MinimalWordCount example to reproduce. I expect the read transform to read 0 lines rather than give an exception, since I used EmptyMatchTreatment.ALLOW. I see the same behavior with ALLOW_IF_WILDCARD. The EmptyMatchTreatment value seems to be ignored.
> {code:java}
> public class MinimalWordCount {
>  public static void main(String[] args) {
>    PipelineOptions options = PipelineOptionsFactory.create();
>    Pipeline p = Pipeline.create(options);
>    p.apply(TextIO.read()
>      .from("gs://apache-beam-samples/doesnotexist/*")
>      .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
>     .apply(TextIO.write().to("wordcounts"));
>    p.run().waitUntilFinish();
>  }
> }
> {code}
> {code:java}
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.FileNotFoundException: No files matched spec: gs://apache-beam-samples/doesnotexist/*
>  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>  at org.apache.beam.examples.MinimalWordCount.main(MinimalWordCount.java:124)
> Caused by: java.io.FileNotFoundException: No files matched spec: gs://apache-beam-samples/doesnotexist/*
>  at org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult(FileSystems.java:172)
>  at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:158)
>  at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:222)
>  at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
>  at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:91)
>  at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81){code}
> We see this behavior both when using DirectRunner and DataflowRunner 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)