You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 21:53:15 UTC

[GitHub] [beam] damccorm opened a new issue, #21202: [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message to PubSub Lite topic

damccorm opened a new issue, #21202:
URL: https://github.com/apache/beam/issues/21202

   We are currently using PubSub Lite IO with Dataflow Runner.
   
   Our Beam job is on streaming mode.
   
   The read from a PubSub Lite subscription works correctly.
   
   The sink to a PubSub topic doesn't work with the runner. 
   
   When we take a look  on the job graph for the PubSubLite Write step (transform : org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink ) in the Google Cloud Console we don't see any writes.
   
   When we check the topic we don't see any outputs.
   
   Code works well on 2.27, 2.28, 2.29 Beam version.
   
   Here is the code we used to do the check on version:
    * 2.27 
    * 2.28
    * 2.29
    * 2.30
    * 2.31
    * 2.33
    * 2.33
   
    
   ```
   
   // PubSubStreamingWriteJobOptions options =
           PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubStreamingWriteJobOptions.class);
   
   options.setStreaming(true);
   
   //set
   up file system
   FileSystems.setDefaultPipelineOptions(options);
   
   
   
   
   TopicPath topicPath = TopicPath.newBuilder()
   
          .setProject(ProjectId.of("[PROJECT ID]"))
           .setLocation(CloudZone.of(CloudRegion.of("[REGION]"),
   "[ZONE CHAR]"))
           .setName(TopicName.of("[TOPIC ID]"))
           .build();
   
   
   PublisherOptions
   publisherOptions =
           PublisherOptions.newBuilder()
                   .setTopicPath(topicPath)
   
                  .build();
   
   Pipeline pipeline = Pipeline.create(options);
   
   pipeline.apply(TextIO.read()
   
                  .from("gs://[BUCKET]/[OBJECT_PREFIX]*")
                   .watchForNewFiles(
          
                   Duration.standardMinutes(1),
                           Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))))
   
          .apply(CREATE_PUB_SUB_LITE_MESSAGE_STEP, MapElements.into(TypeDescriptor.of(PubSubMessage.class)).via(file
   -> {
               Instant instant = Instant.now();
               Message message =
                   
      Message.builder()
                               .setData(ByteString.copyFromUtf8("message " + file))
   
                              .setEventTime(Timestamp.newBuilder()
                                     
    .setNanos(instant.getNano())
                                       .setSeconds(instant.getEpochSecond())
   
                                      .build())
                               .build();
   
               return
   message.toProto();
           }))
           .apply(SINK_PUB_SUB_LITE_MESSAGES_STEP, PubsubLiteIO.write(publisherOptions));
   pipeline.run();
   
   ```
   
   Can you help us to found the issue and fix the Beam version please?
   
    
   
   Best regards,
   
   David Duarte
   
    
   
    
   
   Imported from Jira [BEAM-13129](https://issues.apache.org/jira/browse/BEAM-13129). Original Jira may contain additional context.
   Reported by: dduarte.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org