You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dominik Wosiński <wo...@gmail.com> on 2019/10/02 21:32:05 UTC

Testing DataStreams

Hello,
I have a question, since I am observing quite weird behavior. In the
documentation[1] the example of FlinkMiniCluster usage, shows that we can
expect the results to appear in the same order as they were injected to the
stream by use of *fromElements(). *I mean that Java version of the code is
using assertEquals for list, which will be true only if ArrayLists have the
same elements with the same order. On the other hand the Scala version of
this code uses different matcher that only asserts if all elements are
actually present in the list.
So I have two questions here:

1) For the code below can we be sure that the output will have the same
order as the input ?
For some reason the code returns the elements In quite random order in the
sink. I was actually sure that it is the expected behavior but this piece
of documentation made me wonder.

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.fromElements("A", "B", "C", "D", "E", "F")
    .addSink(new TestSink)
env.execute()

class TestSink extends SinkFunction[String] {
  override def invoke(value: String): Unit =
synchronized{TestSink.vals.add(value)}
}

object TestSink {
  val vals = new ConcurrentLinkedQueue[String]()
}

 2) Is there a reason to enforce order to be kept for env with parallelism
= 1 ? If I want to test some function or set of functions that depend on
the order of the events. Like for example detecting the beginning and the
end of the pattern, can I somehow assure that the order for testing
purposes ??


Best Regards,
Dom.
a
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Re: Testing DataStreams

Posted by Chesnay Schepler <ch...@apache.org>.
I've filed FLINK-14335 for fixing the java example.

On 07/10/2019 11:08, Chesnay Schepler wrote:
> Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid 
> this test is slightly misleading since the order isn't guaranteed in 
> this case.
>
> 1) As long as the parallelism of the sink is 1 the elements should 
> arrive in order.
>
> 2) The order is maintained if parallelism=1 since elements cannot 
> overtake each other in a single stream.
>
> If the parallelism is increased by a subsequent operation O1, then the 
> individual subtasks of O1 will still see a sorted stream.
> If an operation O2 after O1 has a lower parallelism than O1 then it 
> will not see a sorted stream, since the outputs of O1-subtasks may 
> interleave at will. This is the reason why the 
> "ExampleIntegrationTest" is incorrect; while the 2 sink instances 
> receive a sorted input they are adding them into a single collection, 
> interleaving data.
>
> This is fine:
>
> env.fromElements(1L, 21L, 22L)
>    .map(x -> x *2)
>    .setParallelism(2)
>    <apply order-dependent operation>
>    .setParallelism(2);
>
> This is not:
>
> env.fromElements(1L, 21L, 22L)
>    .map(x -> x *2)
>    .setParallelism(2)
>    <apply order-dependent operation>
>    .setParallelism(1);
>
> In other words, if you never reduce the parallelism your functions 
> should be fine.
> If you have to reduce the parallelism then you must resort the stream 
> (or realistically, window) somehow.
>
> On 02/10/2019 23:32, Dominik Wosiński wrote:
>> Hello,
>> I have a question, since I am observing quite weird behavior. In the
>> documentation[1] the example of FlinkMiniCluster usage, shows that we 
>> can
>> expect the results to appear in the same order as they were injected 
>> to the
>> stream by use of *fromElements(). *I mean that Java version of the 
>> code is
>> using assertEquals for list, which will be true only if ArrayLists 
>> have the
>> same elements with the same order. On the other hand the Scala 
>> version of
>> this code uses different matcher that only asserts if all elements are
>> actually present in the list.
>> So I have two questions here:
>>
>> 1) For the code below can we be sure that the output will have the same
>> order as the input ?
>> For some reason the code returns the elements In quite random order 
>> in the
>> sink. I was actually sure that it is the expected behavior but this 
>> piece
>> of documentation made me wonder.
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.fromElements("A", "B", "C", "D", "E", "F")
>>      .addSink(new TestSink)
>> env.execute()
>>
>> class TestSink extends SinkFunction[String] {
>>    override def invoke(value: String): Unit =
>> synchronized{TestSink.vals.add(value)}
>> }
>>
>> object TestSink {
>>    val vals = new ConcurrentLinkedQueue[String]()
>> }
>>
>>   2) Is there a reason to enforce order to be kept for env with 
>> parallelism
>> = 1 ? If I want to test some function or set of functions that depend on
>> the order of the events. Like for example detecting the beginning and 
>> the
>> end of the pattern, can I somehow assure that the order for testing
>> purposes ??
>>
>>
>> Best Regards,
>> Dom.
>> a
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 
>>
>>
>
>


Re: Testing DataStreams

Posted by Chesnay Schepler <ch...@apache.org>.
Are you asking how the elements from the source are distributed across 
the subtasks of the next operation?

That differs a bit across operations; in this case (a map/sink after a 
source) AFAIK they are distributed in a round-robin manner.

On 07/10/2019 11:26, Dominik Wosiński wrote:
> Thanks a lot for Your answer here Chesnay. I have one more question,
> since the `fromElements` is creating the stream with parallelism 1, and I
> can see that the env created for my local machine has a default parallelism
> == 12. So I assume that the entries from the stream are propagated for the
> first operators with some scheme ? Or am I missing something?
>
> Thanks in advance,
> Best Regards,
> Dom.
>
> pon., 7 paź 2019 o 11:08 Chesnay Schepler <ch...@apache.org> napisał(a):
>
>> Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid
>> this test is slightly misleading since the order isn't guaranteed in this
>> case.
>>
>> 1) As long as the parallelism of the sink is 1 the elements should arrive
>> in order.
>>
>> 2) The order is maintained if parallelism=1 since elements cannot overtake
>> each other in a single stream.
>>
>> If the parallelism is increased by a subsequent operation O1, then the
>> individual subtasks of O1 will still see a sorted stream.
>> If an operation O2 after O1 has a lower parallelism than O1 then it will
>> not see a sorted stream, since the outputs of O1-subtasks may interleave at
>> will. This is the reason why the "ExampleIntegrationTest" is incorrect;
>> while the 2 sink instances receive a sorted input they are adding them into
>> a single collection, interleaving data.
>>
>> This is fine:
>>
>> env.fromElements(1L, 21L, 22L)
>>     .map(x -> x * 2)
>>     .setParallelism(2)
>>     <apply order-dependent operation>
>>     .setParallelism(2);
>>
>> This is not:
>>
>> env.fromElements(1L, 21L, 22L)
>>     .map(x -> x * 2)
>>     .setParallelism(2)
>>     <apply order-dependent operation>
>>     .setParallelism(1);
>>
>> In other words, if you never reduce the parallelism your functions should
>> be fine.
>> If you have to reduce the parallelism then you must resort the stream (or
>> realistically, window) somehow.
>>
>> On 02/10/2019 23:32, Dominik Wosiński wrote:
>>
>> Hello,
>> I have a question, since I am observing quite weird behavior. In the
>> documentation[1] the example of FlinkMiniCluster usage, shows that we can
>> expect the results to appear in the same order as they were injected to the
>> stream by use of *fromElements(). *I mean that Java version of the code is
>> using assertEquals for list, which will be true only if ArrayLists have the
>> same elements with the same order. On the other hand the Scala version of
>> this code uses different matcher that only asserts if all elements are
>> actually present in the list.
>> So I have two questions here:
>>
>> 1) For the code below can we be sure that the output will have the same
>> order as the input ?
>> For some reason the code returns the elements In quite random order in the
>> sink. I was actually sure that it is the expected behavior but this piece
>> of documentation made me wonder.
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.fromElements("A", "B", "C", "D", "E", "F")
>>      .addSink(new TestSink)
>> env.execute()
>>
>> class TestSink extends SinkFunction[String] {
>>    override def invoke(value: String): Unit =
>> synchronized{TestSink.vals.add(value)}
>> }
>>
>> object TestSink {
>>    val vals = new ConcurrentLinkedQueue[String]()
>> }
>>
>>   2) Is there a reason to enforce order to be kept for env with parallelism
>> = 1 ? If I want to test some function or set of functions that depend on
>> the order of the events. Like for example detecting the beginning and the
>> end of the pattern, can I somehow assure that the order for testing
>> purposes ??
>>
>>
>> Best Regards,
>> Dom.
>> a
>> [1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>>
>>
>>


Re: Testing DataStreams

Posted by Dominik Wosiński <wo...@gmail.com>.
Thanks a lot for Your answer here Chesnay. I have one more question,
since the `fromElements` is creating the stream with parallelism 1, and I
can see that the env created for my local machine has a default parallelism
== 12. So I assume that the entries from the stream are propagated for the
first operators with some scheme ? Or am I missing something?

Thanks in advance,
Best Regards,
Dom.

pon., 7 paź 2019 o 11:08 Chesnay Schepler <ch...@apache.org> napisał(a):

> Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid
> this test is slightly misleading since the order isn't guaranteed in this
> case.
>
> 1) As long as the parallelism of the sink is 1 the elements should arrive
> in order.
>
> 2) The order is maintained if parallelism=1 since elements cannot overtake
> each other in a single stream.
>
> If the parallelism is increased by a subsequent operation O1, then the
> individual subtasks of O1 will still see a sorted stream.
> If an operation O2 after O1 has a lower parallelism than O1 then it will
> not see a sorted stream, since the outputs of O1-subtasks may interleave at
> will. This is the reason why the "ExampleIntegrationTest" is incorrect;
> while the 2 sink instances receive a sorted input they are adding them into
> a single collection, interleaving data.
>
> This is fine:
>
> env.fromElements(1L, 21L, 22L)
>    .map(x -> x * 2)
>    .setParallelism(2)
>    <apply order-dependent operation>
>    .setParallelism(2);
>
> This is not:
>
> env.fromElements(1L, 21L, 22L)
>    .map(x -> x * 2)
>    .setParallelism(2)
>    <apply order-dependent operation>
>    .setParallelism(1);
>
> In other words, if you never reduce the parallelism your functions should
> be fine.
> If you have to reduce the parallelism then you must resort the stream (or
> realistically, window) somehow.
>
> On 02/10/2019 23:32, Dominik Wosiński wrote:
>
> Hello,
> I have a question, since I am observing quite weird behavior. In the
> documentation[1] the example of FlinkMiniCluster usage, shows that we can
> expect the results to appear in the same order as they were injected to the
> stream by use of *fromElements(). *I mean that Java version of the code is
> using assertEquals for list, which will be true only if ArrayLists have the
> same elements with the same order. On the other hand the Scala version of
> this code uses different matcher that only asserts if all elements are
> actually present in the list.
> So I have two questions here:
>
> 1) For the code below can we be sure that the output will have the same
> order as the input ?
> For some reason the code returns the elements In quite random order in the
> sink. I was actually sure that it is the expected behavior but this piece
> of documentation made me wonder.
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.fromElements("A", "B", "C", "D", "E", "F")
>     .addSink(new TestSink)
> env.execute()
>
> class TestSink extends SinkFunction[String] {
>   override def invoke(value: String): Unit =
> synchronized{TestSink.vals.add(value)}
> }
>
> object TestSink {
>   val vals = new ConcurrentLinkedQueue[String]()
> }
>
>  2) Is there a reason to enforce order to be kept for env with parallelism
> = 1 ? If I want to test some function or set of functions that depend on
> the order of the events. Like for example detecting the beginning and the
> end of the pattern, can I somehow assure that the order for testing
> purposes ??
>
>
> Best Regards,
> Dom.
> a
> [1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>
>
>

Re: Testing DataStreams

Posted by Chesnay Schepler <ch...@apache.org>.
Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid 
this test is slightly misleading since the order isn't guaranteed in 
this case.

1) As long as the parallelism of the sink is 1 the elements should 
arrive in order.

2) The order is maintained if parallelism=1 since elements cannot 
overtake each other in a single stream.

If the parallelism is increased by a subsequent operation O1, then the 
individual subtasks of O1 will still see a sorted stream.
If an operation O2 after O1 has a lower parallelism than O1 then it will 
not see a sorted stream, since the outputs of O1-subtasks may interleave 
at will. This is the reason why the "ExampleIntegrationTest" is 
incorrect; while the 2 sink instances receive a sorted input they are 
adding them into a single collection, interleaving data.

This is fine:

env.fromElements(1L, 21L, 22L)
    .map(x -> x *2)
    .setParallelism(2)
    <apply order-dependent operation>
    .setParallelism(2);

This is not:

env.fromElements(1L, 21L, 22L)
    .map(x -> x *2)
    .setParallelism(2)
    <apply order-dependent operation>
    .setParallelism(1);

In other words, if you never reduce the parallelism your functions 
should be fine.
If you have to reduce the parallelism then you must resort the stream 
(or realistically, window) somehow.

On 02/10/2019 23:32, Dominik Wosiński wrote:
> Hello,
> I have a question, since I am observing quite weird behavior. In the
> documentation[1] the example of FlinkMiniCluster usage, shows that we can
> expect the results to appear in the same order as they were injected to the
> stream by use of *fromElements(). *I mean that Java version of the code is
> using assertEquals for list, which will be true only if ArrayLists have the
> same elements with the same order. On the other hand the Scala version of
> this code uses different matcher that only asserts if all elements are
> actually present in the list.
> So I have two questions here:
>
> 1) For the code below can we be sure that the output will have the same
> order as the input ?
> For some reason the code returns the elements In quite random order in the
> sink. I was actually sure that it is the expected behavior but this piece
> of documentation made me wonder.
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.fromElements("A", "B", "C", "D", "E", "F")
>      .addSink(new TestSink)
> env.execute()
>
> class TestSink extends SinkFunction[String] {
>    override def invoke(value: String): Unit =
> synchronized{TestSink.vals.add(value)}
> }
>
> object TestSink {
>    val vals = new ConcurrentLinkedQueue[String]()
> }
>
>   2) Is there a reason to enforce order to be kept for env with parallelism
> = 1 ? If I want to test some function or set of functions that depend on
> the order of the events. Like for example detecting the beginning and the
> end of the pattern, can I somehow assure that the order for testing
> purposes ??
>
>
> Best Regards,
> Dom.
> a
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>