You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Boyuan Zhang (Jira)" <ji...@apache.org> on 2021/07/02 21:53:00 UTC

[jira] [Updated] (BEAM-12494) Dataflow Kafka Job not triggering for external subnet

     [ https://issues.apache.org/jira/browse/BEAM-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Boyuan Zhang updated BEAM-12494:
--------------------------------
    Resolution: Won't Fix
        Status: Resolved  (was: Open)

> Dataflow Kafka Job not triggering for external subnet
> -----------------------------------------------------
>
>                 Key: BEAM-12494
>                 URL: https://issues.apache.org/jira/browse/BEAM-12494
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.28.0
>         Environment: IntelliJ community version, Maven, Windows, Dataflow version 2.28.0
>            Reporter: Jasminder pal singh sehgal
>            Priority: P2
>             Fix For: Not applicable
>
>         Attachments: CodeSnippet.JPG, LogsStreamingEngine.txt, SuccessfulJobRun-KafkaIngestion.txt, TimeOutLogs_KafkaIngestion.txt, image-2021-06-16-16-54-25-161.png, image-2021-06-16-16-55-57-509.png, image-2021-06-20-22-20-24-363.png, image-2021-06-20-22-23-14-052.png, image-2021-06-21-15-00-09-851.png
>
>
> Hello,
> Our team is facing an issue in streaming the Dataflow Kafka job through IntelliJ that is hosted on a private subnet. 
> The hypothesis is that during Graph Construction time [0], the beam locally tries to execute the code and check all the connections. In our case, we don't have access to subnet through IntelliJ or through the Cloud console. We do have access when compute engine instance is created within that subnet.
> We reached out to Google support and they suggested us to raise a defect with u. The following code throws *time-out* error when we execute through IntelliJ.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
>         .withConsumerConfigUpdates(propertyBuilder)
>         .withConsumerConfigUpdates(
>                 ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group")
>         )
>         .withBootstrapServers(options.getBootstrapServers())
>         .withTopics(topicsList)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .commitOffsetsInFinalize()
>        // .withMaxNumRecords(5)
> )
> {code}
> But, if we uncomment
> {code:java}
> .withMaxNumRecords(){code}
> The code works perfectly and we are able to spin up dataflow job in the desired subnet to ingest the Kafka stream.
> {code:java}
> pipeline.apply("Read Kafka", KafkaIO.<String, String>read()
>         .withConsumerConfigUpdates(propertyBuilder)
>         .withConsumerConfigUpdates(
>                 ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group")
>         )
>         .withBootstrapServers(options.getBootstrapServers())
>         .withTopics(topicsList)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .commitOffsetsInFinalize()
>         .withMaxNumRecords(5)
> )
> {code}
> The issue with the above Code is that the Dataflow will stop after ingesting the given number of records and will act like Batch ingestion, instead of Streaming, which we don't want.
> *Google support team hypothesis:*
> Current hypothesis is that the issue is happening in `KakfaUnboundedSource.split()` [1] which fails due to unable to get Topic information.
> The first point is, `withMaxNumRecords` is used for testing [2] and when specified, the unbounded nature of the pipeline is converted into bounded read in `BoundedReadFromUnboundedSource` [3] but without the `withMaxNumRecords` the pipeline is still unbounded.
> When the pipeline is Bounded (when mentioning withMaxNumRecords) the `split()` happens in Dataflow worker in `SplitFn` [4]. Since, it ran on Dataflow, it did not have issue connecting to Kafka.
> But, when the pipeline is Unbounded (withMaxNumRecords commented out) the `split()` is called when the pipeline is built locally at graph construction phase [5][6] at which point it does not have access to Kafka.
>  
> [0]
>   [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   [1]
>  [https://github.com/apache/beam/blob/v2.28.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L57]
>  [2] [https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withMaxNumRecords-long-]
>  [3] [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L191-L193]
>  [4] [https://github.com/apache/beam/blob/v2.28.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L168-L169]
>  [5] [https://github.com/apache/beam/blob/v2.28.0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L87]
>  [6] [https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#pipeline-lifecycle:-from-pipeline-code-to-dataflow-job]
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)