You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Tanner Cecchetti (JIRA)" <ji...@apache.org> on 2018/07/12 19:41:00 UTC
[jira] [Comment Edited] (BEAM-4772) TextIO.read transform does not
respect .withEmptyMatchTreatment
[ https://issues.apache.org/jira/browse/BEAM-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542126#comment-16542126 ]
Tanner Cecchetti edited comment on BEAM-4772 at 7/12/18 7:40 PM:
-----------------------------------------------------------------
It looks to me like the issue is with CompressedSource. I don't think it correctly inherits the value of its delegate's emptyMatchTreatment property.
To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new FileBasedSource<String>(StaticValueProvider.of("abc"), EmptyMatchTreatment.ALLOW, 0L) {
protected FileBasedSource<String> createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long end) {return null;}
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {return null;}
};
// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);
// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it's might be incorrect to me.
It seems like the fix might be to change the super() calls within CompressedSource to include the delegate's emptyMatchTreatment. For instance,
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
{code}
would become
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpecProvider(), sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would print ALLOW in both statements).
I'm not sure if this fixes the problem or not – I didn't recompile Beam with this change and test it. Just hoping this might be helpful.
was (Author: tanner.c):
It looks to me like the issue is with CompressedSource. I don't think it correctly inherits the value of its delegate's emptyMatchTreatment property.
To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new FileBasedSource<String>(StaticValueProvider.of("abc"), EmptyMatchTreatment.ALLOW, 0L) {
protected FileBasedSource<String> createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long end) {return null;}
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {return null;}
};
// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);
// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it's might be incorrect to me.
It seems like the fix might be to change the super() calls within CompressedSource to include the delegate's emptyMatchTreatment. For instance,
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
{code}
would become
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpecProvider(), sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would print ALLOW in both statements).
I'm not sure if this fixes the problem or not – I didn't recompile Beam with this change and test it. Just hoping this might be helpful.
> TextIO.read transform does not respect .withEmptyMatchTreatment
> ---------------------------------------------------------------
>
> Key: BEAM-4772
> URL: https://issues.apache.org/jira/browse/BEAM-4772
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.5.0
> Reporter: Samuel Waggoner
> Assignee: Kenneth Knowles
> Priority: Major
>
> I modified the MinimalWordCount example to reproduce. I expect the read transform to read 0 lines rather than give an exception, since I used EmptyMatchTreatment.ALLOW. I see the same behavior with ALLOW_IF_WILDCARD. The EmptyMatchTreatment value seems to be ignored.
> {code:java}
> public class MinimalWordCount {
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> p.apply(TextIO.read()
> .from("gs://apache-beam-samples/doesnotexist/*")
> .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
> .apply(TextIO.write().to("wordcounts"));
> p.run().waitUntilFinish();
> }
> }
> {code}
> {code:java}
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.FileNotFoundException: No files matched spec: gs://apache-beam-samples/doesnotexist/*
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at org.apache.beam.examples.MinimalWordCount.main(MinimalWordCount.java:124)
> Caused by: java.io.FileNotFoundException: No files matched spec: gs://apache-beam-samples/doesnotexist/*
> at org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult(FileSystems.java:172)
> at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:158)
> at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:222)
> at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
> at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:91)
> at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81){code}
> We see this behavior both when using DirectRunner and DataflowRunner
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)