You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/15 17:26:00 UTC

[jira] [Updated] (BEAM-13129) [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message to PubSub Lite topic

     [ https://issues.apache.org/jira/browse/BEAM-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-13129:
---------------------------------
    Labels: stale-P2  (was: )

> [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message to PubSub Lite topic
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-13129
>                 URL: https://issues.apache.org/jira/browse/BEAM-13129
>             Project: Beam
>          Issue Type: Bug
>          Components: extensions-java-gcp
>    Affects Versions: 2.30.0, 2.31.0, 2.32.0, 2.33.0
>         Environment: GCP Dataflow
>            Reporter: David Duarte
>            Priority: P2
>              Labels: stale-P2
>
> We are currently using PubSub Lite IO with Dataflow Runner.
> Our Beam job is on streaming mode.
> The read from a PubSub Lite subscription works correctly.
> The sink to a PubSub topic doesn't work with the runner. 
> When we take a look  on the job graph for the PubSubLite Write step (transform : org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink ) in the Google Cloud Console we don't see any writes.
> When we check the topic we don't see any outputs.
> Code works well on 2.27, 2.28, 2.29 Beam version.
> Here is the code we used to do the check on version:
>  * 2.27 
>  * 2.28
>  * 2.29
>  * 2.30
>  * 2.31
>  * 2.33
>  * 2.33
>  
> {code:java}
> // PubSubStreamingWriteJobOptions options =
>         PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubStreamingWriteJobOptions.class);
> options.setStreaming(true);
> //set up file system
> FileSystems.setDefaultPipelineOptions(options);
> TopicPath topicPath = TopicPath.newBuilder()
>         .setProject(ProjectId.of("[PROJECT ID]"))
>         .setLocation(CloudZone.of(CloudRegion.of("[REGION]"), "[ZONE CHAR]"))
>         .setName(TopicName.of("[TOPIC ID]"))
>         .build();
> PublisherOptions publisherOptions =
>         PublisherOptions.newBuilder()
>                 .setTopicPath(topicPath)
>                 .build();
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(TextIO.read()
>                 .from("gs://[BUCKET]/[OBJECT_PREFIX]*")
>                 .watchForNewFiles(
>                         Duration.standardMinutes(1),
>                         Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))))
>         .apply(CREATE_PUB_SUB_LITE_MESSAGE_STEP, MapElements.into(TypeDescriptor.of(PubSubMessage.class)).via(file -> {
>             Instant instant = Instant.now();
>             Message message =
>                     Message.builder()
>                             .setData(ByteString.copyFromUtf8("message " + file))
>                             .setEventTime(Timestamp.newBuilder()
>                                     .setNanos(instant.getNano())
>                                     .setSeconds(instant.getEpochSecond())
>                                     .build())
>                             .build();
>             return message.toProto();
>         }))
>         .apply(SINK_PUB_SUB_LITE_MESSAGES_STEP, PubsubLiteIO.write(publisherOptions));
> pipeline.run();
> {code}
> Can you help us to found the issue and fix the Beam version please?
>  
> Best regards,
> David Duarte
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)