You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "Rajagopal, Viswanathan" <Ra...@DNB.com> on 2020/09/29 10:05:26 UTC

Support streaming side-inputs in the Spark runner

Hi Team,

I have a streaming pipeline (built using Apache Beam with Spark Runner)which consumes events tagged with timestamps from Unbounded source (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then, write all events in a window into a single / multiple files based on shards.
We are trying to achieve the following through Apache Beam constructs
1.       Create a PCollectionView from unbounded source and pass it as a side-input to our main pipeline.
2.       Have a hook method that invokes per window that enables us to do some operational activities per window.
3.       Stop the stream processor (graceful stop) from external system.

Approaches that we tried for 1).
*        Creating a PCollectionView from unbounded source and pass it as a side-input to our main pipeline.
*        Input Pcollection goes through FixedWindow transform.
*        Created custom CombineFn that takes combines all inputs for a window and produce single value Pcollection.
*        Output of Window transform it goes to CombineFn (custom fn) and creates a PCollectionView from CombineFn (using Combine.Globally().asSingletonView() as this output would be passed as a side-input for our main pipeline.
o   Getting the following exception (while running with streaming option set to true)
*        java.lang.IllegalStateException: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView

     *   Noticed that SparkRunner doesn't support the streaming side-inputs in the Spark runner
        *   https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java (View.CreatePCollectionView.class not added to EVALUATORS Map)
        *   https://issues.apache.org/jira/browse/BEAM-2112
        *   https://issues.apache.org/jira/browse/BEAM-1564
So would like to understand on this BEAM-1564 ticket.

Approaches that we tried for 2).
Tried to implement the operational activities in extractOutput() of CombineFn as extractOutput() called once per window. We hadn't tested this as this is blocked by Issue 1).
Is there any other recommended approaches to implement this feature?

Looking for recommended approaches to implement feature 3).

Many Thanks,
Viswa.





Re: Support streaming side-inputs in the Spark runner

Posted by Ismaël Mejía <ie...@gmail.com>.
The limitation of non being able to have side inputs in streaming has been
pending since a long time ago, and there is sadly not anyone to my
knowledge working on this.

One extra aspect to have in mind is that the support for streaming in the
Spark runner uses the Spark DStream API which does not have explicit
watermark handling so the runner has to implement this, this is where the
issues of lacking holds exist, this hopefully could be fixed in the future.

One important detail is that the native Spark Structured Streaming API has
the concept of Watermark but it is more limited than the definition of
Watermark on Beam so even the Spark runner that uses the new API would
require to implement Watermark handling too.

Notice that if you go with native Spark Streaming instead of Beam please
pay attention to the fact that the support for streaming is still limited
for example you cannot do two aggregations in sequence (this is a
limitation that affects the ongoing work on the Structured Streaming
runner) https://issues.apache.org/jira/browse/SPARK-26655 also less
important but still good to know you cannot do continuous processing on
Spark save for  map-like operations, so you should default to the
micro-batch based approach if you any aggregation.
https://issues.apache.org/jira/browse/SPARK-20928



On Fri, Oct 2, 2020 at 9:05 PM Luke Cwik <lc...@google.com> wrote:

> Support for watermark holds is missing for both Spark streaming
> implementations (DStream and structured streaming) so watermark based
> triggers don't produce the correct output.
>
> Excluding the direct runner, Flink is the OSS runner with the most people
> working on it adding features and fixing bugs in it.
> Spark batch is in a good state but streaming development is still ongoing
> and also has a small group of folks.
>
>
> On Fri, Oct 2, 2020 at 10:16 AM <tc...@tutanota.com> wrote:
>
>> For clarification, is it just streaming side inputs that present an issue
>> for SparkRunner or are there other areas that need work?  We've started
>> work on a Beam-based project that includes both streaming and batch
>> oriented work and a Spark cluster was our choice due to the perception that
>> it could handle both types of applications.
>>
>> However, that would have to be reevaluated if SparkRunner isn't up for
>> streaming deployments.  And it seems that SparkStructuredStreamingRunner
>> still needs some time before it's a fully-featured solution.  I guess I'm
>> trying to get a sense of whether these runners are still being actively
>> developed or were they donated by a third-party and are now suffering from
>> bit-rot.
>>
>> Oct 1, 2020, 10:54 by lcwik@google.com:
>>
>> I would suggest trying FlinkRunner as it is a much more complete
>> streaming implementation.
>> SparkRunner has several key things that are missing that won't allow your
>> pipeline to function correctly.
>> If you're really invested in getting SparkRunner working though feel free
>> to contribute the necessary implementations for watermark holds and
>> broadcast state necessary for side inputs.
>>
>> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <
>> RajagopalV@dnb.com> wrote:
>>
>> Hi Team,
>>
>>
>>
>> I have a streaming pipeline (built using Apache Beam with Spark
>> Runner)which consumes events tagged with timestamps from Unbounded source
>> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
>> write all events in a window into a single / multiple files based on shards.
>>
>> We are trying to achieve the following through Apache Beam constructs
>>
>> *1.       **Create a PCollectionView from unbounded source and pass it
>> as a side-input to our main pipeline.*
>>
>> *2.       **Have a hook method that invokes per window that enables us
>> to do some operational activities per window.*
>>
>> *3.       **Stop the stream processor (graceful stop) from external
>> system.*
>>
>>
>>
>> *Approaches that we tried for 1).*
>>
>> ·        Creating a PCollectionView from unbounded source and pass it as
>> a side-input to our main pipeline.
>>
>> ·        Input Pcollection goes through FixedWindow transform.
>>
>> ·        Created custom CombineFn that takes combines all inputs for a
>> window and produce single value Pcollection.
>>
>> ·        Output of Window transform it goes to CombineFn (custom fn) and
>> creates a PCollectionView from CombineFn (using
>> Combine.Globally().asSingletonView() as this output would be passed as a
>> side-input for our main pipeline.
>>
>> o   Getting the following exception (while running with streaming option
>> set to true)
>>
>> ·        java.lang.IllegalStateException: No TransformEvaluator
>> registered for UNBOUNDED transform View.CreatePCollectionView
>>
>>    - Noticed that SparkRunner doesn’t support the streaming side-inputs
>>       in the Spark runner
>>       -
>>          https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>>          (View.CreatePCollectionView.class not added to EVALUATORS Map)
>>          - https://issues.apache.org/jira/browse/BEAM-2112
>>          - https://issues.apache.org/jira/browse/BEAM-1564
>>
>> So would like to understand on this BEAM-1564 ticket.
>>
>>
>>
>> *Approaches that we tried for 2).*
>>
>> Tried to implement the operational activities in extractOutput() of
>> CombineFn as extractOutput() called once per window. We hadn’t tested this
>> as this is blocked by Issue 1).
>>
>> Is there any other recommended approaches to implement this feature?
>>
>>
>>
>> *Looking for recommended approaches to implement feature 3).*
>>
>>
>>
>> Many Thanks,
>>
>> Viswa.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re: Support streaming side-inputs in the Spark runner

Posted by Ismaël Mejía <ie...@gmail.com>.
The limitation of non being able to have side inputs in streaming has been
pending since a long time ago, and there is sadly not anyone to my
knowledge working on this.

One extra aspect to have in mind is that the support for streaming in the
Spark runner uses the Spark DStream API which does not have explicit
watermark handling so the runner has to implement this, this is where the
issues of lacking holds exist, this hopefully could be fixed in the future.

One important detail is that the native Spark Structured Streaming API has
the concept of Watermark but it is more limited than the definition of
Watermark on Beam so even the Spark runner that uses the new API would
require to implement Watermark handling too.

Notice that if you go with native Spark Streaming instead of Beam please
pay attention to the fact that the support for streaming is still limited
for example you cannot do two aggregations in sequence (this is a
limitation that affects the ongoing work on the Structured Streaming
runner) https://issues.apache.org/jira/browse/SPARK-26655 also less
important but still good to know you cannot do continuous processing on
Spark save for  map-like operations, so you should default to the
micro-batch based approach if you any aggregation.
https://issues.apache.org/jira/browse/SPARK-20928



On Fri, Oct 2, 2020 at 9:05 PM Luke Cwik <lc...@google.com> wrote:

> Support for watermark holds is missing for both Spark streaming
> implementations (DStream and structured streaming) so watermark based
> triggers don't produce the correct output.
>
> Excluding the direct runner, Flink is the OSS runner with the most people
> working on it adding features and fixing bugs in it.
> Spark batch is in a good state but streaming development is still ongoing
> and also has a small group of folks.
>
>
> On Fri, Oct 2, 2020 at 10:16 AM <tc...@tutanota.com> wrote:
>
>> For clarification, is it just streaming side inputs that present an issue
>> for SparkRunner or are there other areas that need work?  We've started
>> work on a Beam-based project that includes both streaming and batch
>> oriented work and a Spark cluster was our choice due to the perception that
>> it could handle both types of applications.
>>
>> However, that would have to be reevaluated if SparkRunner isn't up for
>> streaming deployments.  And it seems that SparkStructuredStreamingRunner
>> still needs some time before it's a fully-featured solution.  I guess I'm
>> trying to get a sense of whether these runners are still being actively
>> developed or were they donated by a third-party and are now suffering from
>> bit-rot.
>>
>> Oct 1, 2020, 10:54 by lcwik@google.com:
>>
>> I would suggest trying FlinkRunner as it is a much more complete
>> streaming implementation.
>> SparkRunner has several key things that are missing that won't allow your
>> pipeline to function correctly.
>> If you're really invested in getting SparkRunner working though feel free
>> to contribute the necessary implementations for watermark holds and
>> broadcast state necessary for side inputs.
>>
>> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <
>> RajagopalV@dnb.com> wrote:
>>
>> Hi Team,
>>
>>
>>
>> I have a streaming pipeline (built using Apache Beam with Spark
>> Runner)which consumes events tagged with timestamps from Unbounded source
>> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
>> write all events in a window into a single / multiple files based on shards.
>>
>> We are trying to achieve the following through Apache Beam constructs
>>
>> *1.       **Create a PCollectionView from unbounded source and pass it
>> as a side-input to our main pipeline.*
>>
>> *2.       **Have a hook method that invokes per window that enables us
>> to do some operational activities per window.*
>>
>> *3.       **Stop the stream processor (graceful stop) from external
>> system.*
>>
>>
>>
>> *Approaches that we tried for 1).*
>>
>> ·        Creating a PCollectionView from unbounded source and pass it as
>> a side-input to our main pipeline.
>>
>> ·        Input Pcollection goes through FixedWindow transform.
>>
>> ·        Created custom CombineFn that takes combines all inputs for a
>> window and produce single value Pcollection.
>>
>> ·        Output of Window transform it goes to CombineFn (custom fn) and
>> creates a PCollectionView from CombineFn (using
>> Combine.Globally().asSingletonView() as this output would be passed as a
>> side-input for our main pipeline.
>>
>> o   Getting the following exception (while running with streaming option
>> set to true)
>>
>> ·        java.lang.IllegalStateException: No TransformEvaluator
>> registered for UNBOUNDED transform View.CreatePCollectionView
>>
>>    - Noticed that SparkRunner doesn’t support the streaming side-inputs
>>       in the Spark runner
>>       -
>>          https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>>          (View.CreatePCollectionView.class not added to EVALUATORS Map)
>>          - https://issues.apache.org/jira/browse/BEAM-2112
>>          - https://issues.apache.org/jira/browse/BEAM-1564
>>
>> So would like to understand on this BEAM-1564 ticket.
>>
>>
>>
>> *Approaches that we tried for 2).*
>>
>> Tried to implement the operational activities in extractOutput() of
>> CombineFn as extractOutput() called once per window. We hadn’t tested this
>> as this is blocked by Issue 1).
>>
>> Is there any other recommended approaches to implement this feature?
>>
>>
>>
>> *Looking for recommended approaches to implement feature 3).*
>>
>>
>>
>> Many Thanks,
>>
>> Viswa.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

Re: Support streaming side-inputs in the Spark runner

Posted by Luke Cwik <lc...@google.com>.
Support for watermark holds is missing for both Spark streaming
implementations (DStream and structured streaming) so watermark based
triggers don't produce the correct output.

Excluding the direct runner, Flink is the OSS runner with the most people
working on it adding features and fixing bugs in it.
Spark batch is in a good state but streaming development is still ongoing
and also has a small group of folks.


On Fri, Oct 2, 2020 at 10:16 AM <tc...@tutanota.com> wrote:

> For clarification, is it just streaming side inputs that present an issue
> for SparkRunner or are there other areas that need work?  We've started
> work on a Beam-based project that includes both streaming and batch
> oriented work and a Spark cluster was our choice due to the perception that
> it could handle both types of applications.
>
> However, that would have to be reevaluated if SparkRunner isn't up for
> streaming deployments.  And it seems that SparkStructuredStreamingRunner
> still needs some time before it's a fully-featured solution.  I guess I'm
> trying to get a sense of whether these runners are still being actively
> developed or were they donated by a third-party and are now suffering from
> bit-rot.
>
> Oct 1, 2020, 10:54 by lcwik@google.com:
>
> I would suggest trying FlinkRunner as it is a much more complete streaming
> implementation.
> SparkRunner has several key things that are missing that won't allow your
> pipeline to function correctly.
> If you're really invested in getting SparkRunner working though feel free
> to contribute the necessary implementations for watermark holds and
> broadcast state necessary for side inputs.
>
> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <Ra...@dnb.com>
> wrote:
>
> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.       **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.       **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.       **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·        Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·        Input Pcollection goes through FixedWindow transform.
>
> ·        Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·        Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·        java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>    - Noticed that SparkRunner doesn’t support the streaming side-inputs
>       in the Spark runner
>       -
>          https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>          (View.CreatePCollectionView.class not added to EVALUATORS Map)
>          - https://issues.apache.org/jira/browse/BEAM-2112
>          - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>
>
>

Re: Support streaming side-inputs in the Spark runner

Posted by Luke Cwik <lc...@google.com>.
Support for watermark holds is missing for both Spark streaming
implementations (DStream and structured streaming) so watermark based
triggers don't produce the correct output.

Excluding the direct runner, Flink is the OSS runner with the most people
working on it adding features and fixing bugs in it.
Spark batch is in a good state but streaming development is still ongoing
and also has a small group of folks.


On Fri, Oct 2, 2020 at 10:16 AM <tc...@tutanota.com> wrote:

> For clarification, is it just streaming side inputs that present an issue
> for SparkRunner or are there other areas that need work?  We've started
> work on a Beam-based project that includes both streaming and batch
> oriented work and a Spark cluster was our choice due to the perception that
> it could handle both types of applications.
>
> However, that would have to be reevaluated if SparkRunner isn't up for
> streaming deployments.  And it seems that SparkStructuredStreamingRunner
> still needs some time before it's a fully-featured solution.  I guess I'm
> trying to get a sense of whether these runners are still being actively
> developed or were they donated by a third-party and are now suffering from
> bit-rot.
>
> Oct 1, 2020, 10:54 by lcwik@google.com:
>
> I would suggest trying FlinkRunner as it is a much more complete streaming
> implementation.
> SparkRunner has several key things that are missing that won't allow your
> pipeline to function correctly.
> If you're really invested in getting SparkRunner working though feel free
> to contribute the necessary implementations for watermark holds and
> broadcast state necessary for side inputs.
>
> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <Ra...@dnb.com>
> wrote:
>
> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.       **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.       **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.       **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·        Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·        Input Pcollection goes through FixedWindow transform.
>
> ·        Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·        Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·        java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>    - Noticed that SparkRunner doesn’t support the streaming side-inputs
>       in the Spark runner
>       -
>          https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>          (View.CreatePCollectionView.class not added to EVALUATORS Map)
>          - https://issues.apache.org/jira/browse/BEAM-2112
>          - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>
>
>

Re: Support streaming side-inputs in the Spark runner

Posted by tc...@tutanota.com.
For clarification, is it just streaming side inputs that present an issue for SparkRunner or are there other areas that need work?  We've started work on a Beam-based project that includes both streaming and batch oriented work and a Spark cluster was our choice due to the perception that it could handle both types of applications.

However, that would have to be reevaluated if SparkRunner isn't up for streaming deployments.  And it seems that SparkStructuredStreamingRunner still needs some time before it's a fully-featured solution.  I guess I'm trying to get a sense of whether these runners are still being actively developed or were they donated by a third-party and are now suffering from bit-rot.

Oct 1, 2020, 10:54 by lcwik@google.com:

> I would suggest trying FlinkRunner as it is a much more complete streaming implementation.
> SparkRunner has several key things that are missing that won't allow your pipeline to function correctly.
> If you're really invested in getting SparkRunner working though feel free to contribute the necessary implementations for watermark holds and broadcast state necessary for side inputs.
>
> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <> RajagopalV@dnb.com> > wrote:
>
>>
>> Hi Team,
>>
>>
>>  
>>
>>
>> I have a streaming pipeline (built using Apache Beam with Spark Runner)which consumes events tagged with timestamps from Unbounded source (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then, write all events in a window into a single / multiple files based on shards.
>>
>>
>> We are trying to achieve the following through Apache Beam constructs
>>
>>
>> 1.>>        >> Create a PCollectionView from unbounded source and pass it as a side-input to our main pipeline.
>>
>>
>> 2.>>        >> Have a hook method that invokes per window that enables us to do some operational activities per window.
>>
>>
>> 3.>>        >> Stop the stream processor (graceful stop) from external system.
>>
>>
>>  
>>
>>
>> Approaches that we tried for 1).
>>
>>
>> ·>>         >> Creating a PCollectionView from unbounded source and pass it as a side-input to our main pipeline.
>>
>>
>> ·>>         >> Input Pcollection goes through FixedWindow transform.
>>
>>
>> ·>>         >> Created custom CombineFn that takes combines all inputs for a window and produce single value Pcollection.
>>
>>
>> ·>>         >> Output of Window transform >>  it goes to CombineFn (custom fn) and creates a PCollectionView from CombineFn (using Combine.Globally().asSingletonView() as this output would be passed as a side-input for our main pipeline.
>>
>>
>> o>>    >> Getting the following exception (while running with streaming option set to true)
>>
>>
>> ·>>         >> java.lang.IllegalStateException: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView
>>
>> Noticed that SparkRunner doesn’t support the streaming side-inputs in the Spark runner
>> https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java>>  (View.CreatePCollectionView.class not added to EVALUATORS Map)
>> https://issues.apache.org/jira/browse/BEAM-2112
>> https://issues.apache.org/jira/browse/BEAM-1564
>>
>> So would like to understand on this BEAM-1564 ticket.
>>
>>
>>  
>>
>>
>> Approaches that we tried for 2).
>>
>>
>> Tried to implement the operational activities in extractOutput() of CombineFn as extractOutput() called once per window. We hadn’t tested this as this is blocked by Issue 1).
>>
>>
>> Is there any other recommended approaches to implement this feature?
>>
>>
>>  
>>
>>
>> Looking for recommended approaches to implement feature 3).
>>
>>
>>  
>>
>>
>> Many Thanks,
>>
>>
>> Viswa.
>>
>>
>>  
>>
>>
>>  
>>
>>
>>  
>>
>>
>>  
>>
>>


Re: Support streaming side-inputs in the Spark runner

Posted by tc...@tutanota.com.
For clarification, is it just streaming side inputs that present an issue for SparkRunner or are there other areas that need work?  We've started work on a Beam-based project that includes both streaming and batch oriented work and a Spark cluster was our choice due to the perception that it could handle both types of applications.

However, that would have to be reevaluated if SparkRunner isn't up for streaming deployments.  And it seems that SparkStructuredStreamingRunner still needs some time before it's a fully-featured solution.  I guess I'm trying to get a sense of whether these runners are still being actively developed or were they donated by a third-party and are now suffering from bit-rot.

Oct 1, 2020, 10:54 by lcwik@google.com:

> I would suggest trying FlinkRunner as it is a much more complete streaming implementation.
> SparkRunner has several key things that are missing that won't allow your pipeline to function correctly.
> If you're really invested in getting SparkRunner working though feel free to contribute the necessary implementations for watermark holds and broadcast state necessary for side inputs.
>
> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <> RajagopalV@dnb.com> > wrote:
>
>>
>> Hi Team,
>>
>>
>>  
>>
>>
>> I have a streaming pipeline (built using Apache Beam with Spark Runner)which consumes events tagged with timestamps from Unbounded source (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then, write all events in a window into a single / multiple files based on shards.
>>
>>
>> We are trying to achieve the following through Apache Beam constructs
>>
>>
>> 1.>>        >> Create a PCollectionView from unbounded source and pass it as a side-input to our main pipeline.
>>
>>
>> 2.>>        >> Have a hook method that invokes per window that enables us to do some operational activities per window.
>>
>>
>> 3.>>        >> Stop the stream processor (graceful stop) from external system.
>>
>>
>>  
>>
>>
>> Approaches that we tried for 1).
>>
>>
>> ·>>         >> Creating a PCollectionView from unbounded source and pass it as a side-input to our main pipeline.
>>
>>
>> ·>>         >> Input Pcollection goes through FixedWindow transform.
>>
>>
>> ·>>         >> Created custom CombineFn that takes combines all inputs for a window and produce single value Pcollection.
>>
>>
>> ·>>         >> Output of Window transform >>  it goes to CombineFn (custom fn) and creates a PCollectionView from CombineFn (using Combine.Globally().asSingletonView() as this output would be passed as a side-input for our main pipeline.
>>
>>
>> o>>    >> Getting the following exception (while running with streaming option set to true)
>>
>>
>> ·>>         >> java.lang.IllegalStateException: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView
>>
>> Noticed that SparkRunner doesn’t support the streaming side-inputs in the Spark runner
>> https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java>>  (View.CreatePCollectionView.class not added to EVALUATORS Map)
>> https://issues.apache.org/jira/browse/BEAM-2112
>> https://issues.apache.org/jira/browse/BEAM-1564
>>
>> So would like to understand on this BEAM-1564 ticket.
>>
>>
>>  
>>
>>
>> Approaches that we tried for 2).
>>
>>
>> Tried to implement the operational activities in extractOutput() of CombineFn as extractOutput() called once per window. We hadn’t tested this as this is blocked by Issue 1).
>>
>>
>> Is there any other recommended approaches to implement this feature?
>>
>>
>>  
>>
>>
>> Looking for recommended approaches to implement feature 3).
>>
>>
>>  
>>
>>
>> Many Thanks,
>>
>>
>> Viswa.
>>
>>
>>  
>>
>>
>>  
>>
>>
>>  
>>
>>
>>  
>>
>>


Re: Support streaming side-inputs in the Spark runner

Posted by Luke Cwik <lc...@google.com>.
I would suggest trying FlinkRunner as it is a much more complete streaming
implementation.
SparkRunner has several key things that are missing that won't allow your
pipeline to function correctly.
If you're really invested in getting SparkRunner working though feel free
to contribute the necessary implementations for watermark holds and
broadcast state necessary for side inputs.

On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <Ra...@dnb.com>
wrote:

> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.       **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.       **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.       **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·        Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·        Input Pcollection goes through FixedWindow transform.
>
> ·        Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·        Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·        java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>    - Noticed that SparkRunner doesn’t support the streaming side-inputs
>       in the Spark runner
>          -
>          https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>          (View.CreatePCollectionView.class not added to EVALUATORS Map)
>          - https://issues.apache.org/jira/browse/BEAM-2112
>          - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>

Re: Support streaming side-inputs in the Spark runner

Posted by Luke Cwik <lc...@google.com>.
I would suggest trying FlinkRunner as it is a much more complete streaming
implementation.
SparkRunner has several key things that are missing that won't allow your
pipeline to function correctly.
If you're really invested in getting SparkRunner working though feel free
to contribute the necessary implementations for watermark holds and
broadcast state necessary for side inputs.

On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan <Ra...@dnb.com>
wrote:

> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.       **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.       **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.       **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·        Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·        Input Pcollection goes through FixedWindow transform.
>
> ·        Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·        Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·        java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>    - Noticed that SparkRunner doesn’t support the streaming side-inputs
>       in the Spark runner
>          -
>          https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>          (View.CreatePCollectionView.class not added to EVALUATORS Map)
>          - https://issues.apache.org/jira/browse/BEAM-2112
>          - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>