You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sanabria, Carlos" <ca...@accenture.com> on 2022/06/09 10:05:16 UTC

RE: [External] Re: Source vs SourceFunction and testing

Hi everyone!

Sorry for reopening the thread, but I am having some problems related to this case while migrating our code from Flink 1.12 to Flink 1.15.

We have a base project that encapsulates a ton of common code and configurations. One of the abstractions we have is an AbstractDataStreamJob class that has generic Sources and Sinks. We implemented it like this since Flink 1.8, following the recommendations of the Flink documentation [1]:

"Apache Flink provides a JUnit rule called MiniClusterWithClientResource for testing complete jobs against a local, embedded mini cluster. called MiniClusterWithClientResource.
...
A few remarks on integration testing with MiniClusterWithClientResource:
- In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
..."

This way, we can create the real Kafka Sources and Sinks in the Main class of the job, and also create the test Sources and Sinks in the Junit tests, and inject them in the AbstractDataStreamJob class.

The problem comes with the new Source interface and the end to end tests against the local embedded mini cluster. Prior to Flink 1.15, we used the FromElementsFunction to create the test SourceFunction. Now that we changed the code to use the new Source interface, we cannot use the FromElementsFunction anymore, and we haven't found an equivalent FromElementsSource class with the same functionality but implemented using the new Source API.

We want to keep the same structure in the AbstractDataStreamJob class (with generic and pluggable sources and sinks), as we think it is the most elegant and generic solution.

Is it planned to implement a FromElementsSource class that extends the new Source API? Is there any other alternative that may serve as a workaround for the moment?

We have tried to implement a custom Source for this use case, but it seems like an overwhelming task and we do not want to reinvent the wheel either. If it is planned to implement the FromElementsSource we'd rather prefer to wait for it.

Thanks!
Carlos

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource

-----Original Message-----
From: Qingsheng Ren <re...@gmail.com>
Sent: miércoles, 25 de mayo de 2022 12:10
To: Piotr Domagalski <pi...@domagalski.com>
Cc: user@flink.apache.org
Subject: [External] Re: Source vs SourceFunction and testing

This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.

Glad to see you have resolved the issue!

If you want to learn more about the Source API, the Flink document [1] has a detailed description about it. The original proposal FLIP-27 [2] is also a good reference.

[1] https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8&e=
[2] https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ&e=

Cheers,

Qingsheng

> On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote:
>
> Thank you Qingsheng, this context helps a lot!
>
> And once again thank you all for being such a helpful community!
>
> P.S. I actually struggled for a bit trying to understand why my refactored solution which accepts DataStream<> wouldn't work ("no operators defined in the streaming topology"). Turns out, my assumption that I can call StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get the same environment, was wrong. I had env.addSource and env.fromSource calls using one instance of the environment, but then called env.execute() on another instance :facepalm:
>
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <re...@gmail.com> wrote:
> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction are both interfaces to data sources. SourceFunction was designed and introduced earlier and as the project evolved, many shortcomings emerged. Therefore, the community re-designed the source interface and introduced the new Source API in FLIP-27 [1].
>
> Finally we will deprecate the SourceFunction and use Source as the only interface for all data sources, but considering the huge cost of migration you’ll see SourceFunction and Source co-exist for some time, like the ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as a pioneer has already migrated to the new Source API.
>
> I think the API to end users didn't change a lot: both env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, and you could apply downstream transformations onto it.
>
> [1]
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac
> e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI
> uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX
> U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5
> V3MIQ&e=
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote:
> >
> > Hi Ken,
> >
> > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, navigating the type system and being still confused about differences between Source, SourceFunction, DataStream, DataStreamOperator, etc.
> >
> > I think the DataStream<> type is what I'm looking for? That is, then I can use:
> >
> > DataStream<EventData> source =
> > env.fromSource(getKafkaSource(params), watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> >
> > and
> > DataStream<EventData> s = env.addSource(new
> > ParallelTestSource<>(...)); when using the testing source [1]
> >
> > Does that sound right?
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apac
> > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_flin
> > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFaQ&
> > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZ
> > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl
> > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6wik
> > &e=
> >
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kk...@transpac.com> wrote:
> > Hi Piotr,
> >
> > The way I handle this is via a workflow class that uses a builder approach to specifying inputs, outputs, and any other configuration settings.
> >
> > The inputs are typically DataStream<xxx>.
> >
> > This way I can separate out the Kafka inputs, and use testing sources that give me very precise control over the inputs (e.g. I can hold up on right side data to ensure my stateful left join junction is handling deferred joins properly). I can also use Kafka unit test support (either kafka-junit or Spring embedded Kafka) if needed.
> >
> > Then in the actual tool class (with a main method) I’ll wire up the real Kafka sources, with whatever logic is required to convert the consumer records to what the workflow is expecting.
> >
> > — Ken
> >
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote:
> >>
> >> Hi,
> >>
> >> I'm wondering: what ithe recommended way to structure the job which one would like to test later on with `MiniCluster`.
> >>
> >> I've looked at the flink-training repository examples [1] and they tend to expose the main job as a class that accepts a `SourceFunction` and a `SinkFunction`, which make sense. But then, my job is normally constructed with `KafkaSource` which is then passed to `env.fromSource(...`.
> >>
> >> Is there any recommended way of handling these discrepancies, ie. having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> >>
> >> [1]
> >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apa
> >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626a_
> >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercises_
> >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=eI
> >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTN
> >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltF
> >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdwU&
> >> e=
> >>
> >> --
> >> Piotr Domagalski
> >
> > --------------------------
> > Ken Krugler
> > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlimit
> > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q615
> > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQock
> > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3zk
> > b2whztkDXJhux5r4&e=
> > Custom big data solutions
> > Flink, Pinot, Solr, Elasticsearch
> >
> >
> >
> >
> >
> > --
> > Piotr Domagalski
>
>
>
> --
> Piotr Domagalski


________________________________

This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
______________________________________________________________________________________

www.accenture.com

Re: [External] Re: Source vs SourceFunction and testing

Posted by Jing Ge <ji...@ververica.com>.
Hi Carlos,

You might want to join the discussion about FLIP-238[1] to share your
thoughts with us. Thanks!

Best regards,
Jing

[1] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt


On Thu, Jun 9, 2022 at 2:13 PM Sanabria, Carlos <
carlos.sanabria@accenture.com> wrote:

> Thanks for your quick response!
>
> Yes, this is exactly what we were looking for!
> Seems like a really nice feature. Even better than the FromElementsSource
> we were asking for, because it allows to generate the events dynamically.
>
> Is there any way we can vote for the FLIP-238 to be accepted?
>
> -----Original Message-----
> From: Qingsheng Ren <re...@gmail.com>
> Sent: jueves, 9 de junio de 2022 12:16
> To: Sanabria, Carlos <ca...@accenture.com>
> Cc: user <us...@flink.apache.org>
> Subject: Re: [External] Re: Source vs SourceFunction and testing
>
> Hi Carlos,
>
> FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
> think this is what you are looking for. This FLIP was created just days ago
> so it may take some time to get accepted and released.
>
> [1]
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D238-253A-2BIntroduce-2BFLIP-2D27-2Dbased-2BData-2BGenerator-2BSource&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3q1F&s=ZRTHrrDGp1m0Po50VeAFjAEQBjCM28naJRNWM4CZQoA&e=
>
> Cheers,
>
> Qingsheng
>
> On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos <
> carlos.sanabria@accenture.com> wrote:
> >
> > Hi everyone!
> >
> > Sorry for reopening the thread, but I am having some problems related to
> this case while migrating our code from Flink 1.12 to Flink 1.15.
> >
> > We have a base project that encapsulates a ton of common code and
> configurations. One of the abstractions we have is an AbstractDataStreamJob
> class that has generic Sources and Sinks. We implemented it like this since
> Flink 1.8, following the recommendations of the Flink documentation [1]:
> >
> > "Apache Flink provides a JUnit rule called MiniClusterWithClientResource
> for testing complete jobs against a local, embedded mini cluster. called
> MiniClusterWithClientResource.
> > ...
> > A few remarks on integration testing with MiniClusterWithClientResource:
> > - In order not to copy your whole pipeline code from production to test,
> make sources and sinks pluggable in your production code and inject special
> test sources and test sinks in your tests.
> > ..."
> >
> > This way, we can create the real Kafka Sources and Sinks in the Main
> class of the job, and also create the test Sources and Sinks in the Junit
> tests, and inject them in the AbstractDataStreamJob class.
> >
> > The problem comes with the new Source interface and the end to end tests
> against the local embedded mini cluster. Prior to Flink 1.15, we used the
> FromElementsFunction to create the test SourceFunction. Now that we changed
> the code to use the new Source interface, we cannot use the
> FromElementsFunction anymore, and we haven't found an equivalent
> FromElementsSource class with the same functionality but implemented using
> the new Source API.
> >
> > We want to keep the same structure in the AbstractDataStreamJob class
> (with generic and pluggable sources and sinks), as we think it is the most
> elegant and generic solution.
> >
> > Is it planned to implement a FromElementsSource class that extends the
> new Source API? Is there any other alternative that may serve as a
> workaround for the moment?
> >
> > We have tried to implement a custom Source for this use case, but it
> seems like an overwhelming task and we do not want to reinvent the wheel
> either. If it is planned to implement the FromElementsSource we'd rather
> prefer to wait for it.
> >
> > Thanks!
> > Carlos
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_testing_-2
> > 3junit-2Drule-2Dminiclusterwithclientresource&d=DwIFaQ&c=eIGjsITfXP_y-
> > DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPB
> > qIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3
> > q1F&s=RKTpSSHRudC_BMmTz9xhGOT91uAAbp7HPEejuTihHvU&e=
> >
> > -----Original Message-----
> > From: Qingsheng Ren <re...@gmail.com>
> > Sent: miércoles, 25 de mayo de 2022 12:10
> > To: Piotr Domagalski <pi...@domagalski.com>
> > Cc: user@flink.apache.org
> > Subject: [External] Re: Source vs SourceFunction and testing
> >
> > This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
> links and attachments.
> >
> > Glad to see you have resolved the issue!
> >
> > If you want to learn more about the Source API, the Flink document [1]
> has a detailed description about it. The original proposal FLIP-27 [2] is
> also a good reference.
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> > org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d
> > =DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCz
> > xlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50
> > aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1
> > W8&e= [2]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac
> > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI
> > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX
> > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5
> > V3MIQ&e=
> >
> > Cheers,
> >
> > Qingsheng
> >
> > > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> > >
> > > Thank you Qingsheng, this context helps a lot!
> > >
> > > And once again thank you all for being such a helpful community!
> > >
> > > P.S. I actually struggled for a bit trying to understand why my
> refactored solution which accepts DataStream<> wouldn't work ("no operators
> defined in the streaming topology"). Turns out, my assumption that I can
> call StreamExecutionEnvironment.getExecutionEnvironment() multiple times
> and get the same environment, was wrong. I had env.addSource and
> env.fromSource calls using one instance of the environment, but then called
> env.execute() on another instance :facepalm:
> > >
> > > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <re...@gmail.com>
> wrote:
> > > Hi Piotr,
> > >
> > > I’d like to share my understanding about this. Source and
> SourceFunction are both interfaces to data sources. SourceFunction was
> designed and introduced earlier and as the project evolved, many
> shortcomings emerged. Therefore, the community re-designed the source
> interface and introduced the new Source API in FLIP-27 [1].
> > >
> > > Finally we will deprecate the SourceFunction and use Source as the
> only interface for all data sources, but considering the huge cost of
> migration you’ll see SourceFunction and Source co-exist for some time, like
> the ParallelTestSource you mentioned is still on SourceFunction, and
> KafkaSource as a pioneer has already migrated to the new Source API.
> > >
> > > I think the API to end users didn't change a lot: both
> env.addSource(SourceFunction) and env.fromSource(Source) return a
> DataStream, and you could apply downstream transformations onto it.
> > >
> > > [1]
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > > g_
> > > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterf
> > > ac
> > > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhi
> > > DI
> > > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9
> > > jX
> > > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZ
> > > v5
> > > V3MIQ&e=
> > >
> > > Cheers,
> > >
> > > Qingsheng
> > >
> > > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> > > >
> > > > Hi Ken,
> > > >
> > > > Thanks Ken. I guess the problem I had was, as a complete newbie to
> Flink, navigating the type system and being still confused about
> differences between Source, SourceFunction, DataStream, DataStreamOperator,
> etc.
> > > >
> > > > I think the DataStream<> type is what I'm looking for? That is, then
> I can use:
> > > >
> > > > DataStream<EventData> source =
> > > > env.fromSource(getKafkaSource(params), watermarkStrategy,
> > > > "Kafka"); when using KafkaSource in the normal setup
> > > >
> > > > and
> > > > DataStream<EventData> s = env.addSource(new
> > > > ParallelTestSource<>(...)); when using the testing source [1]
> > > >
> > > > Does that sound right?
> > > >
> > > > [1]
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap
> > > > ac
> > > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_fl
> > > > in
> > > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFa
> > > > Q&
> > > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSp
> > > > EZ
> > > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aR
> > > > Vl
> > > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6w
> > > > ik
> > > > &e=
> > > >
> > > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <
> kkrugler_lists@transpac.com> wrote:
> > > > Hi Piotr,
> > > >
> > > > The way I handle this is via a workflow class that uses a builder
> approach to specifying inputs, outputs, and any other configuration
> settings.
> > > >
> > > > The inputs are typically DataStream<xxx>.
> > > >
> > > > This way I can separate out the Kafka inputs, and use testing
> sources that give me very precise control over the inputs (e.g. I can hold
> up on right side data to ensure my stateful left join junction is handling
> deferred joins properly). I can also use Kafka unit test support (either
> kafka-junit or Spring embedded Kafka) if needed.
> > > >
> > > > Then in the actual tool class (with a main method) I’ll wire up the
> real Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
> > > >
> > > > — Ken
> > > >
> > > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> > > >>
> > > >> Hi,
> > > >>
> > > >> I'm wondering: what ithe recommended way to structure the job which
> one would like to test later on with `MiniCluster`.
> > > >>
> > > >> I've looked at the flink-training repository examples [1] and they
> tend to expose the main job as a class that accepts a `SourceFunction` and
> a `SinkFunction`, which make sense. But then, my job is normally
> constructed with `KafkaSource` which is then passed to `env.fromSource(...`.
> > > >>
> > > >> Is there any recommended way of handling these discrepancies, ie.
> having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> > > >>
> > > >> [1]
> > > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_a
> > > >> pa
> > > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626
> > > >> a_
> > > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercise
> > > >> s_
> > > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=
> > > >> eI
> > > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZg
> > > >> TN
> > > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl
> > > >> tF
> > > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdw
> > > >> U&
> > > >> e=
> > > >>
> > > >> --
> > > >> Piotr Domagalski
> > > >
> > > > --------------------------
> > > > Ken Krugler
> > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlim
> > > > it
> > > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6
> > > > 15
> > > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQo
> > > > ck
> > > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3
> > > > zk
> > > > b2whztkDXJhux5r4&e=
> > > > Custom big data solutions
> > > > Flink, Pinot, Solr, Elasticsearch
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Piotr Domagalski
> > >
> > >
> > >
> > > --
> > > Piotr Domagalski
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy. Your privacy is important to us.
> Accenture uses your personal data only in compliance with data protection
> laws. For further information on how Accenture processes your personal
> data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
> > ______________________________________________________________________
> > ________________
> >
> > http://www.accenture.com
>

RE: [External] Re: Source vs SourceFunction and testing

Posted by "Sanabria, Carlos" <ca...@accenture.com>.
Thanks for your quick response!

Yes, this is exactly what we were looking for!
Seems like a really nice feature. Even better than the FromElementsSource we were asking for, because it allows to generate the events dynamically.

Is there any way we can vote for the FLIP-238 to be accepted?

-----Original Message-----
From: Qingsheng Ren <re...@gmail.com> 
Sent: jueves, 9 de junio de 2022 12:16
To: Sanabria, Carlos <ca...@accenture.com>
Cc: user <us...@flink.apache.org>
Subject: Re: [External] Re: Source vs SourceFunction and testing

Hi Carlos,

FLIP-238 [1] is proposing a FLIP-27-based data generator source and I think this is what you are looking for. This FLIP was created just days ago so it may take some time to get accepted and released.

[1] https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D238-253A-2BIntroduce-2BFLIP-2D27-2Dbased-2BData-2BGenerator-2BSource&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3q1F&s=ZRTHrrDGp1m0Po50VeAFjAEQBjCM28naJRNWM4CZQoA&e= 

Cheers,

Qingsheng

On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos <ca...@accenture.com> wrote:
>
> Hi everyone!
>
> Sorry for reopening the thread, but I am having some problems related to this case while migrating our code from Flink 1.12 to Flink 1.15.
>
> We have a base project that encapsulates a ton of common code and configurations. One of the abstractions we have is an AbstractDataStreamJob class that has generic Sources and Sinks. We implemented it like this since Flink 1.8, following the recommendations of the Flink documentation [1]:
>
> "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for testing complete jobs against a local, embedded mini cluster. called MiniClusterWithClientResource.
> ...
> A few remarks on integration testing with MiniClusterWithClientResource:
> - In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
> ..."
>
> This way, we can create the real Kafka Sources and Sinks in the Main class of the job, and also create the test Sources and Sinks in the Junit tests, and inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests against the local embedded mini cluster. Prior to Flink 1.15, we used the FromElementsFunction to create the test SourceFunction. Now that we changed the code to use the new Source interface, we cannot use the FromElementsFunction anymore, and we haven't found an equivalent FromElementsSource class with the same functionality but implemented using the new Source API.
>
> We want to keep the same structure in the AbstractDataStreamJob class (with generic and pluggable sources and sinks), as we think it is the most elegant and generic solution.
>
> Is it planned to implement a FromElementsSource class that extends the new Source API? Is there any other alternative that may serve as a workaround for the moment?
>
> We have tried to implement a custom Source for this use case, but it seems like an overwhelming task and we do not want to reinvent the wheel either. If it is planned to implement the FromElementsSource we'd rather prefer to wait for it.
>
> Thanks!
> Carlos
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_testing_-2
> 3junit-2Drule-2Dminiclusterwithclientresource&d=DwIFaQ&c=eIGjsITfXP_y-
> DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPB
> qIxifg&m=HzoMy2jOOV5oDa2SuDfcm4cy2LK-9DOQ530Dmvi2r4f0jhOVKmqOTToQ6ArM3
> q1F&s=RKTpSSHRudC_BMmTz9xhGOT91uAAbp7HPEejuTihHvU&e=
>
> -----Original Message-----
> From: Qingsheng Ren <re...@gmail.com>
> Sent: miércoles, 25 de mayo de 2022 12:10
> To: Piotr Domagalski <pi...@domagalski.com>
> Cc: user@flink.apache.org
> Subject: [External] Re: Source vs SourceFunction and testing
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.
>
> Glad to see you have resolved the issue!
>
> If you want to learn more about the Source API, the Flink document [1] has a detailed description about it. The original proposal FLIP-27 [2] is also a good reference.
>
> [1] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.
> org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d
> =DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCz
> xlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50
> aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1
> W8&e= [2] 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac
> e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI
> uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX
> U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5
> V3MIQ&e=
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote:
> >
> > Thank you Qingsheng, this context helps a lot!
> >
> > And once again thank you all for being such a helpful community!
> >
> > P.S. I actually struggled for a bit trying to understand why my refactored solution which accepts DataStream<> wouldn't work ("no operators defined in the streaming topology"). Turns out, my assumption that I can call StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get the same environment, was wrong. I had env.addSource and env.fromSource calls using one instance of the environment, but then called env.execute() on another instance :facepalm:
> >
> > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <re...@gmail.com> wrote:
> > Hi Piotr,
> >
> > I’d like to share my understanding about this. Source and SourceFunction are both interfaces to data sources. SourceFunction was designed and introduced earlier and as the project evolved, many shortcomings emerged. Therefore, the community re-designed the source interface and introduced the new Source API in FLIP-27 [1].
> >
> > Finally we will deprecate the SourceFunction and use Source as the only interface for all data sources, but considering the huge cost of migration you’ll see SourceFunction and Source co-exist for some time, like the ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as a pioneer has already migrated to the new Source API.
> >
> > I think the API to end users didn't change a lot: both env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, and you could apply downstream transformations onto it.
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_ 
> > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterf
> > ac 
> > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhi
> > DI 
> > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9
> > jX
> > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZ
> > v5
> > V3MIQ&e=
> >
> > Cheers,
> >
> > Qingsheng
> >
> > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote:
> > >
> > > Hi Ken,
> > >
> > > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, navigating the type system and being still confused about differences between Source, SourceFunction, DataStream, DataStreamOperator, etc.
> > >
> > > I think the DataStream<> type is what I'm looking for? That is, then I can use:
> > >
> > > DataStream<EventData> source =
> > > env.fromSource(getKafkaSource(params), watermarkStrategy, 
> > > "Kafka"); when using KafkaSource in the normal setup
> > >
> > > and
> > > DataStream<EventData> s = env.addSource(new 
> > > ParallelTestSource<>(...)); when using the testing source [1]
> > >
> > > Does that sound right?
> > >
> > > [1]
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_ap
> > > ac 
> > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_fl
> > > in 
> > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFa
> > > Q& 
> > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSp
> > > EZ 
> > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aR
> > > Vl 
> > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6w
> > > ik
> > > &e=
> > >
> > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kk...@transpac.com> wrote:
> > > Hi Piotr,
> > >
> > > The way I handle this is via a workflow class that uses a builder approach to specifying inputs, outputs, and any other configuration settings.
> > >
> > > The inputs are typically DataStream<xxx>.
> > >
> > > This way I can separate out the Kafka inputs, and use testing sources that give me very precise control over the inputs (e.g. I can hold up on right side data to ensure my stateful left join junction is handling deferred joins properly). I can also use Kafka unit test support (either kafka-junit or Spring embedded Kafka) if needed.
> > >
> > > Then in the actual tool class (with a main method) I’ll wire up the real Kafka sources, with whatever logic is required to convert the consumer records to what the workflow is expecting.
> > >
> > > — Ken
> > >
> > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote:
> > >>
> > >> Hi,
> > >>
> > >> I'm wondering: what ithe recommended way to structure the job which one would like to test later on with `MiniCluster`.
> > >>
> > >> I've looked at the flink-training repository examples [1] and they tend to expose the main job as a class that accepts a `SourceFunction` and a `SinkFunction`, which make sense. But then, my job is normally constructed with `KafkaSource` which is then passed to `env.fromSource(...`.
> > >>
> > >> Is there any recommended way of handling these discrepancies, ie. having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> > >>
> > >> [1]
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_a
> > >> pa 
> > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626
> > >> a_ 
> > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercise
> > >> s_ 
> > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=
> > >> eI 
> > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZg
> > >> TN 
> > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl
> > >> tF 
> > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdw
> > >> U&
> > >> e=
> > >>
> > >> --
> > >> Piotr Domagalski
> > >
> > > --------------------------
> > > Ken Krugler
> > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlim
> > > it
> > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6
> > > 15 
> > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQo
> > > ck 
> > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3
> > > zk
> > > b2whztkDXJhux5r4&e=
> > > Custom big data solutions
> > > Flink, Pinot, Solr, Elasticsearch
> > >
> > >
> > >
> > >
> > >
> > > --
> > > Piotr Domagalski
> >
> >
> >
> > --
> > Piotr Domagalski
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
> ______________________________________________________________________
> ________________
>
> http://www.accenture.com

Re: [External] Re: Source vs SourceFunction and testing

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Carlos,

FLIP-238 [1] is proposing a FLIP-27-based data generator source and I
think this is what you are looking for. This FLIP was created just
days ago so it may take some time to get accepted and released.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source

Cheers,

Qingsheng

On Thu, Jun 9, 2022 at 6:05 PM Sanabria, Carlos
<ca...@accenture.com> wrote:
>
> Hi everyone!
>
> Sorry for reopening the thread, but I am having some problems related to this case while migrating our code from Flink 1.12 to Flink 1.15.
>
> We have a base project that encapsulates a ton of common code and configurations. One of the abstractions we have is an AbstractDataStreamJob class that has generic Sources and Sinks. We implemented it like this since Flink 1.8, following the recommendations of the Flink documentation [1]:
>
> "Apache Flink provides a JUnit rule called MiniClusterWithClientResource for testing complete jobs against a local, embedded mini cluster. called MiniClusterWithClientResource.
> ...
> A few remarks on integration testing with MiniClusterWithClientResource:
> - In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
> ..."
>
> This way, we can create the real Kafka Sources and Sinks in the Main class of the job, and also create the test Sources and Sinks in the Junit tests, and inject them in the AbstractDataStreamJob class.
>
> The problem comes with the new Source interface and the end to end tests against the local embedded mini cluster. Prior to Flink 1.15, we used the FromElementsFunction to create the test SourceFunction. Now that we changed the code to use the new Source interface, we cannot use the FromElementsFunction anymore, and we haven't found an equivalent FromElementsSource class with the same functionality but implemented using the new Source API.
>
> We want to keep the same structure in the AbstractDataStreamJob class (with generic and pluggable sources and sinks), as we think it is the most elegant and generic solution.
>
> Is it planned to implement a FromElementsSource class that extends the new Source API? Is there any other alternative that may serve as a workaround for the moment?
>
> We have tried to implement a custom Source for this use case, but it seems like an overwhelming task and we do not want to reinvent the wheel either. If it is planned to implement the FromElementsSource we'd rather prefer to wait for it.
>
> Thanks!
> Carlos
>
> [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource
>
> -----Original Message-----
> From: Qingsheng Ren <re...@gmail.com>
> Sent: miércoles, 25 de mayo de 2022 12:10
> To: Piotr Domagalski <pi...@domagalski.com>
> Cc: user@flink.apache.org
> Subject: [External] Re: Source vs SourceFunction and testing
>
> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links and attachments.
>
> Glad to see you have resolved the issue!
>
> If you want to learn more about the Source API, the Flink document [1] has a detailed description about it. The original proposal FLIP-27 [2] is also a good reference.
>
> [1] https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8&e=
> [2] https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ&e=
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote:
> >
> > Thank you Qingsheng, this context helps a lot!
> >
> > And once again thank you all for being such a helpful community!
> >
> > P.S. I actually struggled for a bit trying to understand why my refactored solution which accepts DataStream<> wouldn't work ("no operators defined in the streaming topology"). Turns out, my assumption that I can call StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get the same environment, was wrong. I had env.addSource and env.fromSource calls using one instance of the environment, but then called env.execute() on another instance :facepalm:
> >
> > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <re...@gmail.com> wrote:
> > Hi Piotr,
> >
> > I’d like to share my understanding about this. Source and SourceFunction are both interfaces to data sources. SourceFunction was designed and introduced earlier and as the project evolved, many shortcomings emerged. Therefore, the community re-designed the source interface and introduced the new Source API in FLIP-27 [1].
> >
> > Finally we will deprecate the SourceFunction and use Source as the only interface for all data sources, but considering the huge cost of migration you’ll see SourceFunction and Source co-exist for some time, like the ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource as a pioneer has already migrated to the new Source API.
> >
> > I think the API to end users didn't change a lot: both env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, and you could apply downstream transformations onto it.
> >
> > [1]
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> > confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterfac
> > e&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDI
> > uCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jX
> > U50aRVltFNinifOKvurHPTzdPL1da&s=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5
> > V3MIQ&e=
> >
> > Cheers,
> >
> > Qingsheng
> >
> > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote:
> > >
> > > Hi Ken,
> > >
> > > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, navigating the type system and being still confused about differences between Source, SourceFunction, DataStream, DataStreamOperator, etc.
> > >
> > > I think the DataStream<> type is what I'm looking for? That is, then I can use:
> > >
> > > DataStream<EventData> source =
> > > env.fromSource(getKafkaSource(params), watermarkStrategy, "Kafka");
> > > when using KafkaSource in the normal setup
> > >
> > > and
> > > DataStream<EventData> s = env.addSource(new
> > > ParallelTestSource<>(...)); when using the testing source [1]
> > >
> > > Does that sound right?
> > >
> > > [1]
> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apac
> > > he_flink-2Dtraining_blob_master_common_src_test_java_org_apache_flin
> > > k_training_exercises_testing_ParallelTestSource.java-23L26&d=DwIFaQ&
> > > c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZ
> > > gTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVl
> > > tFNinifOKvurHPTzdPL1da&s=eAmu4e10Rx2sRi9WMCvaVlljXiKpph9rddEY4gT6wik
> > > &e=
> > >
> > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kk...@transpac.com> wrote:
> > > Hi Piotr,
> > >
> > > The way I handle this is via a workflow class that uses a builder approach to specifying inputs, outputs, and any other configuration settings.
> > >
> > > The inputs are typically DataStream<xxx>.
> > >
> > > This way I can separate out the Kafka inputs, and use testing sources that give me very precise control over the inputs (e.g. I can hold up on right side data to ensure my stateful left join junction is handling deferred joins properly). I can also use Kafka unit test support (either kafka-junit or Spring embedded Kafka) if needed.
> > >
> > > Then in the actual tool class (with a main method) I’ll wire up the real Kafka sources, with whatever logic is required to convert the consumer records to what the workflow is expecting.
> > >
> > > — Ken
> > >
> > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote:
> > >>
> > >> Hi,
> > >>
> > >> I'm wondering: what ithe recommended way to structure the job which one would like to test later on with `MiniCluster`.
> > >>
> > >> I've looked at the flink-training repository examples [1] and they tend to expose the main job as a class that accepts a `SourceFunction` and a `SinkFunction`, which make sense. But then, my job is normally constructed with `KafkaSource` which is then passed to `env.fromSource(...`.
> > >>
> > >> Is there any recommended way of handling these discrepancies, ie. having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> > >>
> > >> [1]
> > >> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apa
> > >> che_flink-2Dtraining_blob_05791e55ad7ff0358b5c57ea8f40eada4a1f626a_
> > >> ride-2Dcleansing_src_test_java_org_apache_flink_training_exercises_
> > >> ridecleansing_RideCleansingIntegrationTest.java-23L61&d=DwIFaQ&c=eI
> > >> GjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q6157zGhiDIuCzxlSpEZgTN
> > >> bdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltF
> > >> NinifOKvurHPTzdPL1da&s=Kn2wMHDZwLCCp1FoG1WCmg-rfAS2577zxQnqpZfUdwU&
> > >> e=
> > >>
> > >> --
> > >> Piotr Domagalski
> > >
> > > --------------------------
> > > Ken Krugler
> > > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.scaleunlimit
> > > ed.com&d=DwIFaQ&c=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU&r=Q615
> > > 7zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg&m=UaxcpZWDroSZiLenzhGnRRQock
> > > pCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da&s=DYLpp8_j5uOXA4FnVMdSLmXZ3zk
> > > b2whztkDXJhux5r4&e=
> > > Custom big data solutions
> > > Flink, Pinot, Solr, Elasticsearch
> > >
> > >
> > >
> > >
> > >
> > > --
> > > Piotr Domagalski
> >
> >
> >
> > --
> > Piotr Domagalski
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. Your privacy is important to us. Accenture uses your personal data only in compliance with data protection laws. For further information on how Accenture processes your personal data, please see our privacy statement at https://www.accenture.com/us-en/privacy-policy.
> ______________________________________________________________________________________
>
> www.accenture.com