You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/15 17:26:01 UTC
[jira] [Commented] (BEAM-13129) [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message to PubSub Lite topic
[ https://issues.apache.org/jira/browse/BEAM-13129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507096#comment-17507096 ]
Beam JIRA Bot commented on BEAM-13129:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> [Streaming][PubSub Lite][DataflowRunner] PubSub Lite IO doesn't sink message to PubSub Lite topic
> -------------------------------------------------------------------------------------------------
>
> Key: BEAM-13129
> URL: https://issues.apache.org/jira/browse/BEAM-13129
> Project: Beam
> Issue Type: Bug
> Components: extensions-java-gcp
> Affects Versions: 2.30.0, 2.31.0, 2.32.0, 2.33.0
> Environment: GCP Dataflow
> Reporter: David Duarte
> Priority: P2
> Labels: stale-P2
>
> We are currently using PubSub Lite IO with Dataflow Runner.
> Our Beam job is on streaming mode.
> The read from a PubSub Lite subscription works correctly.
> The sink to a PubSub topic doesn't work with the runner.
> When we take a look on the job graph for the PubSubLite Write step (transform : org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteSink ) in the Google Cloud Console we don't see any writes.
> When we check the topic we don't see any outputs.
> Code works well on 2.27, 2.28, 2.29 Beam version.
> Here is the code we used to do the check on version:
> * 2.27
> * 2.28
> * 2.29
> * 2.30
> * 2.31
> * 2.33
> * 2.33
>
> {code:java}
> // PubSubStreamingWriteJobOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubStreamingWriteJobOptions.class);
> options.setStreaming(true);
> //set up file system
> FileSystems.setDefaultPipelineOptions(options);
> TopicPath topicPath = TopicPath.newBuilder()
> .setProject(ProjectId.of("[PROJECT ID]"))
> .setLocation(CloudZone.of(CloudRegion.of("[REGION]"), "[ZONE CHAR]"))
> .setName(TopicName.of("[TOPIC ID]"))
> .build();
> PublisherOptions publisherOptions =
> PublisherOptions.newBuilder()
> .setTopicPath(topicPath)
> .build();
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(TextIO.read()
> .from("gs://[BUCKET]/[OBJECT_PREFIX]*")
> .watchForNewFiles(
> Duration.standardMinutes(1),
> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))))
> .apply(CREATE_PUB_SUB_LITE_MESSAGE_STEP, MapElements.into(TypeDescriptor.of(PubSubMessage.class)).via(file -> {
> Instant instant = Instant.now();
> Message message =
> Message.builder()
> .setData(ByteString.copyFromUtf8("message " + file))
> .setEventTime(Timestamp.newBuilder()
> .setNanos(instant.getNano())
> .setSeconds(instant.getEpochSecond())
> .build())
> .build();
> return message.toProto();
> }))
> .apply(SINK_PUB_SUB_LITE_MESSAGES_STEP, PubsubLiteIO.write(publisherOptions));
> pipeline.run();
> {code}
> Can you help us to found the issue and fix the Beam version please?
>
> Best regards,
> David Duarte
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)