You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 22:30:27 UTC

[GitHub] [beam] damccorm opened a new issue, #21257: Either Create or DirectRunner fails to produce all elements to the following transform

damccorm opened a new issue, #21257:
URL: https://github.com/apache/beam/issues/21257

   The following pipeline fails to print out all numbers 1 to 100 when run on DirectRunner in streaming mode.
   
   This was identified implementing org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT, which uses a workaround for the bug in either Create or DirectRunner:
   
   ```
   
   private static final int COUNT = 100;
   
   private static AtomicInteger CREATED_COUNT = new AtomicInteger();
   
   public
   static void run(Pipeline pipeline) {
     PCollection<Integer> indexes = pipeline.apply( "createIndexes",
   
        Create.of(IntStream.range(0, COUNT).boxed().collect(Collectors.toList())));
   
   indexes.apply(
   
   "createMessages",
    MapElements.via(
        new SimpleFunction<Integer, Integer>(
            index ->
   {
              System.err.println("Created message index " + createdCount.incrementAndGet());
        
        return index;
            }) {}));
   
     pipeline.run().waitUntilFinish();  // Never terminates
   }
   
   ```
   
   
   Imported from Jira [BEAM-12867](https://issues.apache.org/jira/browse/BEAM-12867). Original Jira may contain additional context.
   Reported by: dpcollins-google.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21257: Either Create or DirectRunner fails to produce all elements to the following transform

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21257:
URL: https://github.com/apache/beam/issues/21257#issuecomment-1246007492

   @dpcollins-google per my prior comment, can you confirm with a PAssert whether there is a lack of data processing actually occurring? Or, you may take the position that all side effects of a DoFn should have occurred before the pipeline shuts down. All in all, I am not convinced this is P1 but it could be a real bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #21257: Either Create or DirectRunner fails to produce all elements to the following transform

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #21257:
URL: https://github.com/apache/beam/issues/21257#issuecomment-1284352355

   I attempted to reproduce this:
   ```
   import java.util.List;
   import java.util.stream.Collectors;
   import java.util.stream.IntStream;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.options.PipelineOptions;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.options.StreamingOptions;
   import org.apache.beam.sdk.testing.ExpectedLogs;
   import org.apache.beam.sdk.testing.PAssert;
   import org.apache.beam.sdk.transforms.Create;
   import org.apache.beam.sdk.transforms.DoFn;
   import org.apache.beam.sdk.transforms.ParDo;
   import org.apache.beam.sdk.values.PCollection;
   import org.junit.Rule;
   import org.junit.Test;
   import org.junit.runner.RunWith;
   import org.junit.runners.Parameterized;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   @RunWith(Parameterized.class)
   public class SimpleStreamingTest {
   
     @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(SimpleStreamingTest.class);
   
     private static final Logger LOG = LoggerFactory.getLogger(SimpleStreamingTest.class);
   
     private static final int COUNT = 100;
   
     @Parameterized.Parameters
     public static Object[][] data() {
       return new Object[100][0];
     }
   
     @Test
     public void testDoFnSideEffects(){
       PipelineOptions options = PipelineOptionsFactory.create();
       options.as(StreamingOptions.class).setStreaming(true);
       Pipeline testPipeline = Pipeline.create(options);
       List<Integer> indexes = IntStream.range(0, COUNT).boxed().collect(Collectors.toList());
       PCollection<Integer> values = testPipeline.apply( "createIndexes", Create.of(indexes))
           .apply("createMessages", ParDo.of(new Counter()));
   
       PAssert.that(values).containsInAnyOrder(indexes);
   
       testPipeline.run().waitUntilFinish();
   
       for(int i : indexes){
         expectedLogs.verifyError(String.valueOf(i));
       }
     }
   
     private static class Counter extends DoFn<Integer,Integer>{
   
       @ProcessElement
       public void processElement(@Element Integer i, OutputReceiver<Integer> receiver){
         LOG.error(String.valueOf(i));
         receiver.output(i);
       }
     }
   }
   ```
   Running the above 100 times did not result in any failures, so I believe this issue is not current.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey closed issue #21257: Either Create or DirectRunner fails to produce all elements to the following transform

Posted by GitBox <gi...@apache.org>.
johnjcasey closed issue #21257: Either Create or DirectRunner fails to produce all elements to the following transform
URL: https://github.com/apache/beam/issues/21257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org