You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 01:53:02 UTC

[GitHub] [beam] kennknowles opened a new issue, #19485: Java SDK : How to bounded messages from PubSub ?

kennknowles opened a new issue, #19485:
URL: https://github.com/apache/beam/issues/19485

   I want to set up a dataflow Pipeline in Streaming mode.
   
   My dataflow do theses tasks :
    * Read messages from pubsub
    * Build my "Document" object
    * Insert this Document into BigQuery
    * Store the initial message into Google Cloud Storage
   
   The code successfully build and run, but it takes a lot of time for some messages.
   
   I think it takes a lot of time because it treats pusub message one by one :
   
    -\>  I build a PCollection<Documents\> with messages from pubsub.
   ```
   
   PCollection<Documents> rawtickets = pipeline
           .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
   
          .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
        
     .triggering(Repeatedly.forever(
                   AfterPane.elementCountAtLeast(1000))
           ).withAllowedLateness(Duration.standardSeconds(300))
   
          .discardingFiredPanes()
           )
           .apply("Make Document ", ParDo.of(new DoFn<String,
   Documents>() {
                      @ProcessElement    
                      public void processElement(ProcessContext
   c) throws Exception {
                                        String rawXMl = c.element();
            
                              /** code for build my object "Document" **/
                              
           Documents docTest = new Documents();
                                       docTest.init(rawXMl);
   
   
                                       c.output(docTest);
                                        }
      
                                }
           ))
           .setCoder(AvroCoder.of(Documents.class));
   
   
   ```
   
    
   
   Here a picture of my complete process :
   
   [Complete Process ](https://zupimages.net/up/19/14/w4df.png)
   
   We can see the latency of the process after reading messages.
   
   Concretely, I search to create a Pcollection of N elements. I have tried methods founds here : [https://beam.apache.org/documentation/programming-guide/#triggers](https://beam.apache.org/documentation/programming-guide/#triggers) but it doesn't group  my pubsub messages.
   
   How I can batch this process  ?
   
    
   
   EDIT : I think I must use "GroupByKey"  after a window, but that return an error :
   ```
   
   PCollection<Documents> rawtickets = pipeline
           .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
   
          .apply("Windowing", Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
        
             .triggering(Repeatedly.forever(
                           AfterPane.elementCountAtLeast(1000))
   
                  ).withAllowedLateness(Duration.standardSeconds(300))
                   .discardingFiredPanes()
   
             )
          .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
               
         @ProcessElement    
                      public void processElement(ProcessContext c) throws Exception
   {
                                        String rawXMl = c.element();
                                
          /** code for build my object "Document" **/
                                       Documents docTest
   = new Documents();
                                       docTest.init(rawXMl);
   
                     
                     c.output(docTest);
                                        }
                        
              }
           ))
           .setCoder(AvroCoder.of(Documents.class))
           .apply("GroupByKey",GroupByKey.<Integer,Documents>create());
   
   ```
   
   Error : Wrong 2nd argument type. Found: 'org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents\>', required: 'org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<Documents\>,OutputT\>' less... Inspection info: apply (String, org.apache.beam.sdk.transforms.PTransform<? super org.apache.beam.sdk.values.PCollection<Documents\>,OutputT\>) in PCollection cannot be applied to (String, org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents\>)  
   
    
   
    
   
   Imported from Jira [BEAM-6970](https://issues.apache.org/jira/browse/BEAM-6970). Original Jira may contain additional context.
   Reported by: chloethonin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org