You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Thalita Vergilio (JIRA)" <ji...@apache.org> on 2017/12/02 18:58:00 UTC

[jira] [Updated] (BEAM-3281) PTransform name not being propagated to the Flink Web UI

     [ https://issues.apache.org/jira/browse/BEAM-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Thalita Vergilio updated BEAM-3281:
-----------------------------------
    Description: 
This could be related to BEAM-1107, which was logged for Flink Batch processing.

I am experiencing a similar issue for stream processing. I would have expected the name passed to 
{code:java}
pipeline.apply(String name, PTransform<? super PBegin,OutputT> root)
{code}
 to be propagated to the Flink Web UI. 

The documentation seems to suggest that this was the intended functionality: 
https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-

Here is some sample code setting the name: 

{code:java}

p.apply("Apply Windowing Function", Window.into(FixedWindows.of(Duration.standardSeconds(10))))
                .apply("Transform the Pipeline to Key by Window",
                        ParDo.of(
                                new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                    @ProcessElement
                                    public void processElement(ProcessContext context, IntervalWindow window) {
                                        context.output(KV.of(window, context.element()));
                                    }
                                }))
                .apply("Group by Key (window)", GroupByKey.create())
                .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
                .apply("Write output to Kafka", KafkaIO.<IntervalWindowResult, PueResult>write()
                        .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
                        .withTopic("results")
                        .withKeySerializer(IntervalWindowResultSerialiser.class)
                        .withValueSerializer(PueResultSerialiser.class)
                );

{code}


I will upload a screenshot of the results.

  was:
This could be related to BEAM-1107, which was logged for Flink Batch processing.

I am experiencing a similar issue for stream processing. I would have expected the name passed to `pipeline.apply(String name, PTransform root)` to be propagated to the Flink Web UI. 

The documentation seems to suggest that this was the intended functionality: 
https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-

Here is some sample code setting the name: 

{{p.apply("Apply Windowing Function", Window.into(FixedWindows.of(Duration.standardSeconds(10))))
                .apply("Transform the Pipeline to Key by Window",
                        ParDo.of(
                                new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                    @ProcessElement
                                    public void processElement(ProcessContext context, IntervalWindow window) {
                                        context.output(KV.of(window, context.element()));
                                    }
                                }))
                .apply("Group by Key (window)", GroupByKey.create())
                .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
                .apply("Write output to Kafka", KafkaIO.<IntervalWindowResult, PueResult>write()
                        .withBootstrapServers(OSDC_KAFKA_IP + ":" + KAFKA_PORT)
                        .withTopic("results")
                        .withKeySerializer(IntervalWindowResultSerialiser.class)
                        .withValueSerializer(PueResultSerialiser.class)
                );
}}

I will upload a screenshot of the results.


> PTransform name not being propagated to the Flink Web UI
> --------------------------------------------------------
>
>                 Key: BEAM-3281
>                 URL: https://issues.apache.org/jira/browse/BEAM-3281
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.1.0
>            Reporter: Thalita Vergilio
>            Assignee: Aljoscha Krettek
>            Priority: Minor
>              Labels: flink
>
> This could be related to BEAM-1107, which was logged for Flink Batch processing.
> I am experiencing a similar issue for stream processing. I would have expected the name passed to 
> {code:java}
> pipeline.apply(String name, PTransform<? super PBegin,OutputT> root)
> {code}
>  to be propagated to the Flink Web UI. 
> The documentation seems to suggest that this was the intended functionality: 
> https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#apply-java.lang.String-org.apache.beam.sdk.transforms.PTransform-
> Here is some sample code setting the name: 
> {code:java}
> p.apply("Apply Windowing Function", Window.into(FixedWindows.of(Duration.standardSeconds(10))))
>                 .apply("Transform the Pipeline to Key by Window",
>                         ParDo.of(
>                                 new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
>                                     @ProcessElement
>                                     public void processElement(ProcessContext context, IntervalWindow window) {
>                                         context.output(KV.of(window, context.element()));
>                                     }
>                                 }))
>                 .apply("Group by Key (window)", GroupByKey.create())
>                 .apply("Calculate PUE", ParDo.of(new PueCalculatorFn()))
>                 .apply("Write output to Kafka", KafkaIO.<IntervalWindowResult, PueResult>write()
>                         .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
>                         .withTopic("results")
>                         .withKeySerializer(IntervalWindowResultSerialiser.class)
>                         .withValueSerializer(PueResultSerialiser.class)
>                 );
> {code}
> I will upload a screenshot of the results.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)