You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sofer, Tovi " <to...@citi.com> on 2017/12/07 16:54:25 UTC

Testing CoFlatMap correctness

Hi group,

What is the best practice for testing CoFlatMap operator correctness?
We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.
How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and emitting timestamp and watermark per element didn't help, and still each element arrive in unexpected order.



Thanks in advance,

Tovi




Re: Testing CoFlatMap correctness

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Tovi,

testing the behavior of a data flow with respect to the order of records
from different sources is tricky.
Source functions are working independently of each other and it is not
easily possible to control the order in which records is shipped (and
received) across source functions.

You could implement source functions that only emit records (and possibly
watermarks) when being triggered from your test code.
Having two of these sources in a program, you could choose in which order
the sources emit records and watermarks.
However, you would need to ensure that a record is completely processed
(the program came to a hold), before you emit the next record (from the
same or another source) to avoid race conditions.
You could do this with timeouts, but this is very fragile and I would not
recommend it.

Btw. watermarks have no effect on the order in which records are processed.
They only determine the event-time of an operator.
In case of a co-operator, this is the the smaller watermark time of both
input streams. So, the input records of a co-operator are not aligned or
hold back based on their timestamps or watermarks.
Instead, an operator can put "early" records into state and process them
when a later watermark arrives, which means that all relevant records from
both inputs have been received.

I hope this helps,
Fabian


2017-12-10 10:04 GMT+01:00 Sofer, Tovi <to...@citi.com>:

> Hi Kostas,
>
>
>
> Thank you for the suggestion.
>
> But in our case we want to do either a component test that involves
> several steps, where the CoFlatMap is one step in the middle, or
> integration test that test the whole flow, which involves also the
> CoFlatMap.
>
> And we trying to understand how to test such scenario so that results are
> predictable, and that elements from main stream arrive after elements from
> control stream, or other way around.
>
>
>
> Thanks again,
>
> Tovi
>
>
>
> *From:* Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> *Sent:* יום ה 07 דצמבר 2017 19:11
> *To:* Sofer, Tovi [ICG-IT] <ts...@imceu.eu.ssmb.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Testing CoFlatMap correctness
>
>
>
> Hi Tovi,
>
>
>
> What you need is the TwoInputStreamOperatorTestHarness. This will allow
> you to do something like:
>
>
>
> TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
>       new TwoInputStreamOperatorTestHarness<>(myoperator);
>
> testHarness.setup();
> testHarness.open();
>
> testHarness.processWatermark1(new Watermark(17));
> testHarness.processWatermark2(new Watermark(17));
> testHarness.processElement1(new StreamRecord<>(5, 12L));
>
> testHarness.processWatermark1(new Watermark(42));
> testHarness.processWatermark2(new Watermark(42));
> testHarness.processElement2(new StreamRecord<>("6", 13L));
>
>
>
> and then use testHarness.getOutput() to get the output and compare it
> against the expected one.
>
>
>
> If you have access to the Flink source code, I would recommend you to have
> a look at the CoProcessOperatorTest for an example.
>
>
>
> Or you can find it here: https://github.com/apache/flink/blob/master/
> flink-streaming-java/src/test/java/org/apache/flink/
> streaming/api/operators/co/CoProcessOperatorTest.java
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_test_java_org_apache_flink_streaming_api_operators_co_CoProcessOperatorTest.java&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99_MiSMX5oOs&m=gBef8R0NU-syKQC30s15-0u2EacsQc1Nc_-YiEJOKu8&s=JMo6NemjvMcOawmPTAuffrC8WfvZZppabhaJ8o5IJdY&e=>
>
>
>
> Hope this helps,
>
> Kostas
>
>
>
>
>
> On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <to...@citi.com> wrote:
>
>
>
> Hi group,
>
>
>
> What is the best practice for testing CoFlatMap operator correctness?
>
> We have two source functions, each emits data to stream, and a connect
> between them, and I want to make sure that when streamA element arrive
> before stream element, a certain behavior happens.
>
> How can I test this case?
>
> Using env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.
>
>
>
> Thanks in advance,
>
> Tovi
>
>
>

RE: Testing CoFlatMap correctness

Posted by "Sofer, Tovi " <to...@citi.com>.
Hi Kostas,

Thank you for the suggestion.
But in our case we want to do either a component test that involves several steps, where the CoFlatMap is one step in the middle, or integration test that test the whole flow, which involves also the CoFlatMap.
And we trying to understand how to test such scenario so that results are predictable, and that elements from main stream arrive after elements from control stream, or other way around.

Thanks again,
Tovi

From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
Sent: יום ה 07 דצמבר 2017 19:11
To: Sofer, Tovi [ICG-IT] <ts...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org
Subject: Re: Testing CoFlatMap correctness

Hi Tovi,

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like:


TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
      new TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

and then use testHarness.getOutput() to get the output and compare it against the expected one.

If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example.

Or you can find it here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_test_java_org_apache_flink_streaming_api_operators_co_CoProcessOperatorTest.java&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99_MiSMX5oOs&m=gBef8R0NU-syKQC30s15-0u2EacsQc1Nc_-YiEJOKu8&s=JMo6NemjvMcOawmPTAuffrC8WfvZZppabhaJ8o5IJdY&e=>

Hope this helps,
Kostas



On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <to...@citi.com>> wrote:

Hi group,

What is the best practice for testing CoFlatMap operator correctness?
We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.
How can I test this case?

Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.



Thanks in advance,

Tovi


Re: Testing CoFlatMap correctness

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Tovi,

What you need is the TwoInputStreamOperatorTestHarness. This will allow you to do something like:

TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
      new TwoInputStreamOperatorTestHarness<>(myoperator);

testHarness.setup();
testHarness.open();

testHarness.processWatermark1(new Watermark(17));
testHarness.processWatermark2(new Watermark(17));
testHarness.processElement1(new StreamRecord<>(5, 12L));

testHarness.processWatermark1(new Watermark(42));
testHarness.processWatermark2(new Watermark(42));
testHarness.processElement2(new StreamRecord<>("6", 13L));

and then use testHarness.getOutput() to get the output and compare it against the expected one.

If you have access to the Flink source code, I would recommend you to have a look at the CoProcessOperatorTest for an example.

Or you can find it here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java <https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java>

Hope this helps,
Kostas


> On Dec 7, 2017, at 5:54 PM, Sofer, Tovi <to...@citi.com> wrote:
> 
> Hi group,
> 
>  
> 
> What is the best practice for testing CoFlatMap operator correctness?
> 
> We have two source functions, each emits data to stream, and a connect between them, and I want to make sure that when streamA element arrive before stream element, a certain behavior happens.
> 
> How can I test this case?
> 
> Using env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> and emitting timestamp and watermark per element didn’t help, and still each element arrive in unexpected order.
>  
> Thanks in advance,
> Tovi