You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/22 07:48:28 UTC

[GitHub] [beam] tiwarinaman commented on issue #21608: Allow skipping of any output when writing an empty PCollection.

tiwarinaman commented on issue #21608:
URL: https://github.com/apache/beam/issues/21608#issuecomment-1192286626

   I saw that in the new version (2.40.0) there is a new method called **skipIfEmpty()** which is available in the **writeCustomType()** implementation as you can see in the below code
   
   ```java
   errorRecords.apply("WritingErrorRecords", TextIO.<String>writeCustomType().to(options.getBucketPath())
                .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
                .skipIfEmpty()
                .withoutSharding()
                .withSuffix(".txt")
                .withShardNameTemplate("-SSS")
                .withNumShards(1));
     ```
   
   This code will work when the PCollection is empty which is good this is what we wanted but when PCollection is not empty then It will throw NullPointerException pasted below.
   
   ```java
   Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.transforms.SerializableFunction.apply(Object)" because "this.formatFunction" is null
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
   	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
   	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
   	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
   	at org.apache.beam.examples.WordCount.runWordCount(WordCount.java:97)
   	at org.apache.beam.examples.WordCount.main(WordCount.java:113)
   Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.transforms.SerializableFunction.apply(Object)" because "this.formatFunction" is null
   	at org.apache.beam.sdk.io.DynamicFileDestinations$ConstantFilenamePolicy.formatRecord(DynamicFileDestinations.java:51)
   	at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:953)
   ```
   
   I believe this is because when we build the object of **AutoValue_TextIO_TypedWrite** so in the setter of **DynamicDestinations** initially it's null and we never update the value. 
   I'm talking about the below apache beam code in TextIO file.
   
   ```java
   public static <UserT> TypedWrite<UserT, Void> writeCustomType() {
           return (new AutoValue_TextIO_TypedWrite.Builder()).setFilenamePrefix((ValueProvider)null).setTempDirectory((ValueProvider)null).setShardTemplate((String)null).setFilenameSuffix((String)null).setFilenamePolicy((FileBasedSink.FilenamePolicy)null).setDynamicDestinations((FileBasedSink.DynamicDestinations)null).setDelimiter(new char[]{'\n'}).setWritableByteChannelFactory(org.apache.beam.sdk.io.FileBasedSink.CompressionType.UNCOMPRESSED).setWindowedWrites(false).setNoSpilling(false).setSkipIfEmpty(false).build();
       }
   ```
   
   So can somebody help me to solve this problem actually I'm working on a project and this issue is on **priority**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org