You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steven Ruppert <st...@fullcontact.com> on 2017/01/23 22:19:33 UTC

TestStreamEnvironment: await last flush of processing time-based windows

Hi,

I'm attempting to unit test link with the flink-test-utils support, on
flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
but when running any processing time-based windowing functions,
`env.execute()` will return before any values are flushed out of the
windows.

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertTrue;

public class TestMinimal {
    static AtomicBoolean sinked = new AtomicBoolean(false);
    @Test
    public void testThing() throws Exception {
        StreamExecutionEnvironment env =
            TestStreamEnvironment.getExecutionEnvironment();

        env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
            .keyBy(0)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .sum(1)
            .addSink(new SinkFunction<Tuple2<String, Integer>>() {
                @Override
                public void invoke(Tuple2<String, Integer> value)
throws Exception {
                    sinked.set(true);
                }
            });
        env.execute();
        // presumably once execute returns, all elements have passed
through all operators.
        assertTrue(sinked.get());
    }
}

Is there a way to make this test pass?

Using event time windows instead does seem to work, but processing
time would be a little more convenient.

-- 
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or 
previous e-mail messages attached to it is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution is prohibited. If you 
are not the intended recipient, please contact the sender by reply email 
and destroy all copies of the original message.*

Re: TestStreamEnvironment: await last flush of processing time-based windows

Posted by Steven Ruppert <st...@fullcontact.com>.
Thanks for the clarification. I'm not familiar enough with the
internals of flink to offer any technical suggestions, but it'd be
nice to have some more documentation around testing flink and possible
pitfalls like this.

For anybody with the same issue, note that IngestionTime also works,
and is slightly easier to use with unit tests that don't care about
event time. Also, you can copy and modify the FromElementsFunction and
add a Thread.sleep after it emits all the test inputs. If you pause
long enough for the processing time windows downstream, then they will
fire. Obviously not a great solution, but useful if you can't use
ingestion time instead.

On Tue, Jan 24, 2017 at 9:28 AM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
> I'm afraid there is no way of making this work with the current
> implementation. Especially getting this to work in a distributed setting
> seems hard.
>
> I'm very open for suggestions on this topic, though. :-)
>
> Cheers,
> Aljoscha
>
> On Mon, 23 Jan 2017 at 23:19 Steven Ruppert <st...@fullcontact.com> wrote:
>>
>> Hi,
>>
>> I'm attempting to unit test link with the flink-test-utils support, on
>> flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
>> but when running any processing time-based windowing functions,
>> `env.execute()` will return before any values are flushed out of the
>> windows.
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>> import
>> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.util.TestStreamEnvironment;
>> import org.junit.Test;
>>
>> import java.util.concurrent.atomic.AtomicBoolean;
>>
>> import static org.junit.Assert.assertTrue;
>>
>> public class TestMinimal {
>>     static AtomicBoolean sinked = new AtomicBoolean(false);
>>     @Test
>>     public void testThing() throws Exception {
>>         StreamExecutionEnvironment env =
>>             TestStreamEnvironment.getExecutionEnvironment();
>>
>>         env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
>>             .keyBy(0)
>>             .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>>             .sum(1)
>>             .addSink(new SinkFunction<Tuple2<String, Integer>>() {
>>                 @Override
>>                 public void invoke(Tuple2<String, Integer> value)
>> throws Exception {
>>                     sinked.set(true);
>>                 }
>>             });
>>         env.execute();
>>         // presumably once execute returns, all elements have passed
>> through all operators.
>>         assertTrue(sinked.get());
>>     }
>> }
>>
>> Is there a way to make this test pass?
>>
>> Using event time windows instead does seem to work, but processing
>> time would be a little more convenient.
>>
>> --
>> *CONFIDENTIALITY NOTICE: This email message, and any documents, files or
>> previous e-mail messages attached to it is for the sole use of the
>> intended
>> recipient(s) and may contain confidential and privileged information. Any
>> unauthorized review, use, disclosure or distribution is prohibited. If you
>> are not the intended recipient, please contact the sender by reply email
>> and destroy all copies of the original message.*

-- 
*CONFIDENTIALITY NOTICE: This email message, and any documents, files or 
previous e-mail messages attached to it is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution is prohibited. If you 
are not the intended recipient, please contact the sender by reply email 
and destroy all copies of the original message.*

Re: TestStreamEnvironment: await last flush of processing time-based windows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm afraid there is no way of making this work with the current
implementation. Especially getting this to work in a distributed setting
seems hard.

I'm very open for suggestions on this topic, though. :-)

Cheers,
Aljoscha

On Mon, 23 Jan 2017 at 23:19 Steven Ruppert <st...@fullcontact.com> wrote:

> Hi,
>
> I'm attempting to unit test link with the flink-test-utils support, on
> flink 1.1.4. I've got basic flatMap stuff flowing through just fine,
> but when running any processing time-based windowing functions,
> `env.execute()` will return before any values are flushed out of the
> windows.
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.util.TestStreamEnvironment;
> import org.junit.Test;
>
> import java.util.concurrent.atomic.AtomicBoolean;
>
> import static org.junit.Assert.assertTrue;
>
> public class TestMinimal {
>     static AtomicBoolean sinked = new AtomicBoolean(false);
>     @Test
>     public void testThing() throws Exception {
>         StreamExecutionEnvironment env =
>             TestStreamEnvironment.getExecutionEnvironment();
>
>         env.fromElements(Tuple2.of("a", 1), Tuple2.of("a", 1))
>             .keyBy(0)
>             .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
>             .sum(1)
>             .addSink(new SinkFunction<Tuple2<String, Integer>>() {
>                 @Override
>                 public void invoke(Tuple2<String, Integer> value)
> throws Exception {
>                     sinked.set(true);
>                 }
>             });
>         env.execute();
>         // presumably once execute returns, all elements have passed
> through all operators.
>         assertTrue(sinked.get());
>     }
> }
>
> Is there a way to make this test pass?
>
> Using event time windows instead does seem to work, but processing
> time would be a little more convenient.
>
> --
> *CONFIDENTIALITY NOTICE: This email message, and any documents, files or
> previous e-mail messages attached to it is for the sole use of the intended
> recipient(s) and may contain confidential and privileged information. Any
> unauthorized review, use, disclosure or distribution is prohibited. If you
> are not the intended recipient, please contact the sender by reply email
> and destroy all copies of the original message.*
>