You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Avi Levi <av...@bluevoyant.com> on 2019/03/19 12:11:07 UTC

ingesting time for TimeCharacteristic.IngestionTime on unit test

Hi,
Our stream is not based on time sequence and we do not use time based
operations. we do want to clean the state after x days hence we fire timer
event. My problem is that our unit test fires the event immediately (there
is no ingestion time) how can I inject ingestion time ?

Cheers
Avi

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Avi,

Good to hear that!

Cheers,
Kostas

On Mon, Mar 25, 2019 at 3:37 PM Avi Levi <av...@bluevoyant.com> wrote:

> Thanks, I'll check it out. I got a bit confused with the Ingesting time
> equals to null in tests but all is ok now , I appreciate that
>
> On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <kk...@gmail.com> wrote:
>
>> Hi Avi,
>>
>> Just to verify your ITCase, I wrote the following dummy example and it
>> seems to be "working" (ie. I can see non null timestamps and timers firing).
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> env.setParallelism(1);
>>
>> env
>>       .addSource(new LongSource())
>>       .keyBy(elmnt -> elmnt)
>>       .process(new KeyedProcessFunction<Long, Long, Long>() {
>>
>>          @Override
>>          public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
>>
>>
>>             long timestamp = ctx.timestamp();
>>             long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();
>>
>>             System.out.println(ctx.timestamp() + " " + timerTimestamp);
>>
>>             ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
>>          }
>>
>>          @Override
>>          public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
>>             System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
>>          }
>>       }).print();
>> env.execute();
>>
>> The source is:
>>
>> private static final class LongSource implements SourceFunction<Long> {
>>
>>    private volatile boolean running = true;
>>
>>    private long element = 0L;
>>
>>    @Override
>>    public void run(SourceContext<java.lang.Long> ctx) throws Exception {
>>       while (running) {
>>          ctx.collect(element++ % 10);
>>          Thread.sleep(10L);
>>       }
>>    }
>>
>>    @Override
>>    public void cancel() {
>>
>>    }
>> }
>>
>>
>> Could you provide more details on how your usecase differs from the above
>> dummy example so that we can pin down the problem?
>>
>> As a side-note, Ingestion time is essentially event time, with the only
>> difference that the timestamp assigner in the beginning gives each element
>> the timestamp System.currentTimeMillis. So in this case, maybe you could
>> also consider setting event time timers but keep in mind then your
>> Watermark emission interval.
>>
>> In addition, if you want to simply check processing time processing of
>> you operator (not the whole pipeline), then you could make use of the
>> OneInputStreamTaskTestHarness or its keyed variant. This allows you to
>> provide your own processing time provider thus allow you to
>> deterministically
>> test processing time behaviour.
>>
>> Cheers,
>> Kostas
>>
>>
>>
>> On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <av...@bluevoyant.com> wrote:
>>
>>> Any idea what should I do to overcome this?
>>>
>>> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <av...@bluevoyant.com>
>>> wrote:
>>>
>>>> Hi Andrey,
>>>> I am testing a Filter operator that receives a key from the stream and
>>>> checks if it is a new one or not. if it is new it keeps it in state and
>>>> fire a timer all that is done using the ProcessFunction.
>>>> The testing is using some CollectSink as described here
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> and
>>>> the source is implementation of the SourceFunction that accepts a
>>>> collection of values and adds it to ctx.collect .
>>>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>>>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>>>> x) the timer is fired immediately.
>>>>
>>>>
>>>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <an...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Avi,
>>>>>
>>>>> what is the structure of your unit test? do you create some source and
>>>>> then apply function or you test only ProcessFunction methods in isolation?
>>>>> does ctx.timestamp() return zero or which value?
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>>
>>>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <av...@bluevoyant.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andrey ,
>>>>>> I'm using IngestionTime
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>>>
>>>>>> This is my timer in the processElement:
>>>>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>>>
>>>>>> The problem is how do I use it in my unit tests ? since there is no
>>>>>> IngestionTime and timers are fired immediately so the timers actions (such
>>>>>> as state cleanup) are fired before time and causing the tests to fail .
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <an...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Avi,
>>>>>>>
>>>>>>> do you use processing time timer
>>>>>>> (timerService().registerProcessingTimeTimer)?
>>>>>>> why do you need ingestion time? do you
>>>>>>> set TimeCharacteristic.IngestionTime?
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <av...@bluevoyant.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> Our stream is not based on time sequence and we do not use time
>>>>>>>> based operations. we do want to clean the state after x days hence we fire
>>>>>>>> timer event. My problem is that our unit test fires the event immediately
>>>>>>>> (there is no ingestion time) how can I inject ingestion time ?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Avi
>>>>>>>>
>>>>>>>>

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Posted by Avi Levi <av...@bluevoyant.com>.
Thanks, I'll check it out. I got a bit confused with the Ingesting time
equals to null in tests but all is ok now , I appreciate that

On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Avi,
>
> Just to verify your ITCase, I wrote the following dummy example and it
> seems to be "working" (ie. I can see non null timestamps and timers firing).
>
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
>
> env
>       .addSource(new LongSource())
>       .keyBy(elmnt -> elmnt)
>       .process(new KeyedProcessFunction<Long, Long, Long>() {
>
>          @Override
>          public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
>
>
>             long timestamp = ctx.timestamp();
>             long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();
>
>             System.out.println(ctx.timestamp() + " " + timerTimestamp);
>
>             ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
>          }
>
>          @Override
>          public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
>             System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
>          }
>       }).print();
> env.execute();
>
> The source is:
>
> private static final class LongSource implements SourceFunction<Long> {
>
>    private volatile boolean running = true;
>
>    private long element = 0L;
>
>    @Override
>    public void run(SourceContext<java.lang.Long> ctx) throws Exception {
>       while (running) {
>          ctx.collect(element++ % 10);
>          Thread.sleep(10L);
>       }
>    }
>
>    @Override
>    public void cancel() {
>
>    }
> }
>
>
> Could you provide more details on how your usecase differs from the above
> dummy example so that we can pin down the problem?
>
> As a side-note, Ingestion time is essentially event time, with the only
> difference that the timestamp assigner in the beginning gives each element
> the timestamp System.currentTimeMillis. So in this case, maybe you could
> also consider setting event time timers but keep in mind then your
> Watermark emission interval.
>
> In addition, if you want to simply check processing time processing of you
> operator (not the whole pipeline), then you could make use of the
> OneInputStreamTaskTestHarness or its keyed variant. This allows you to
> provide your own processing time provider thus allow you to
> deterministically
> test processing time behaviour.
>
> Cheers,
> Kostas
>
>
>
> On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <av...@bluevoyant.com> wrote:
>
>> Any idea what should I do to overcome this?
>>
>> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <av...@bluevoyant.com> wrote:
>>
>>> Hi Andrey,
>>> I am testing a Filter operator that receives a key from the stream and
>>> checks if it is a new one or not. if it is new it keeps it in state and
>>> fire a timer all that is done using the ProcessFunction.
>>> The testing is using some CollectSink as described here
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> and
>>> the source is implementation of the SourceFunction that accepts a
>>> collection of values and adds it to ctx.collect .
>>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>>> x) the timer is fired immediately.
>>>
>>>
>>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <an...@ververica.com>
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> what is the structure of your unit test? do you create some source and
>>>> then apply function or you test only ProcessFunction methods in isolation?
>>>> does ctx.timestamp() return zero or which value?
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>>
>>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <av...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Hi Andrey ,
>>>>> I'm using IngestionTime
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>>
>>>>> This is my timer in the processElement:
>>>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>>
>>>>> The problem is how do I use it in my unit tests ? since there is no
>>>>> IngestionTime and timers are fired immediately so the timers actions (such
>>>>> as state cleanup) are fired before time and causing the tests to fail .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <an...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> do you use processing time timer
>>>>>> (timerService().registerProcessingTimeTimer)?
>>>>>> why do you need ingestion time? do you
>>>>>> set TimeCharacteristic.IngestionTime?
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <av...@bluevoyant.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Our stream is not based on time sequence and we do not use time
>>>>>>> based operations. we do want to clean the state after x days hence we fire
>>>>>>> timer event. My problem is that our unit test fires the event immediately
>>>>>>> (there is no ingestion time) how can I inject ingestion time ?
>>>>>>>
>>>>>>> Cheers
>>>>>>> Avi
>>>>>>>
>>>>>>>

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Avi,

Just to verify your ITCase, I wrote the following dummy example and it
seems to be "working" (ie. I can see non null timestamps and timers firing).


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);

env
      .addSource(new LongSource())
      .keyBy(elmnt -> elmnt)
      .process(new KeyedProcessFunction<Long, Long, Long>() {

         @Override
         public void processElement(Long value, Context ctx,
Collector<Long> out) throws Exception {


            long timestamp = ctx.timestamp();
            long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();

            System.out.println(ctx.timestamp() + " " + timerTimestamp);

            ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Long> out) throws Exception {
            System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
         }
      }).print();
env.execute();

The source is:

private static final class LongSource implements SourceFunction<Long> {

   private volatile boolean running = true;

   private long element = 0L;

   @Override
   public void run(SourceContext<java.lang.Long> ctx) throws Exception {
      while (running) {
         ctx.collect(element++ % 10);
         Thread.sleep(10L);
      }
   }

   @Override
   public void cancel() {

   }
}


Could you provide more details on how your usecase differs from the above
dummy example so that we can pin down the problem?

As a side-note, Ingestion time is essentially event time, with the only
difference that the timestamp assigner in the beginning gives each element
the timestamp System.currentTimeMillis. So in this case, maybe you could
also consider setting event time timers but keep in mind then your
Watermark emission interval.

In addition, if you want to simply check processing time processing of you
operator (not the whole pipeline), then you could make use of the
OneInputStreamTaskTestHarness or its keyed variant. This allows you to
provide your own processing time provider thus allow you to
deterministically
test processing time behaviour.

Cheers,
Kostas



On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <av...@bluevoyant.com> wrote:

> Any idea what should I do to overcome this?
>
> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <av...@bluevoyant.com> wrote:
>
>> Hi Andrey,
>> I am testing a Filter operator that receives a key from the stream and
>> checks if it is a new one or not. if it is new it keeps it in state and
>> fire a timer all that is done using the ProcessFunction.
>> The testing is using some CollectSink as described here
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> and
>> the source is implementation of the SourceFunction that accepts a
>> collection of values and adds it to ctx.collect .
>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>> x) the timer is fired immediately.
>>
>>
>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <an...@ververica.com>
>> wrote:
>>
>>> Hi Avi,
>>>
>>> what is the structure of your unit test? do you create some source and
>>> then apply function or you test only ProcessFunction methods in isolation?
>>> does ctx.timestamp() return zero or which value?
>>>
>>> Best,
>>> Andrey
>>>
>>>
>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <av...@bluevoyant.com>
>>> wrote:
>>>
>>>> Hi Andrey ,
>>>> I'm using IngestionTime
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>
>>>> This is my timer in the processElement:
>>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>
>>>> The problem is how do I use it in my unit tests ? since there is no
>>>> IngestionTime and timers are fired immediately so the timers actions (such
>>>> as state cleanup) are fired before time and causing the tests to fail .
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <an...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Avi,
>>>>>
>>>>> do you use processing time timer
>>>>> (timerService().registerProcessingTimeTimer)?
>>>>> why do you need ingestion time? do you
>>>>> set TimeCharacteristic.IngestionTime?
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <av...@bluevoyant.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Our stream is not based on time sequence and we do not use time based
>>>>>> operations. we do want to clean the state after x days hence we fire timer
>>>>>> event. My problem is that our unit test fires the event immediately (there
>>>>>> is no ingestion time) how can I inject ingestion time ?
>>>>>>
>>>>>> Cheers
>>>>>> Avi
>>>>>>
>>>>>>

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Posted by Avi Levi <av...@bluevoyant.com>.
Any idea what should I do to overcome this?

On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <av...@bluevoyant.com> wrote:

> Hi Andrey,
> I am testing a Filter operator that receives a key from the stream and
> checks if it is a new one or not. if it is new it keeps it in state and
> fire a timer all that is done using the ProcessFunction.
> The testing is using some CollectSink as described here
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> and
> the source is implementation of the SourceFunction that accepts a
> collection of values and adds it to ctx.collect .
> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
> x) the timer is fired immediately.
>
>
> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <an...@ververica.com>
> wrote:
>
>> Hi Avi,
>>
>> what is the structure of your unit test? do you create some source and
>> then apply function or you test only ProcessFunction methods in isolation?
>> does ctx.timestamp() return zero or which value?
>>
>> Best,
>> Andrey
>>
>>
>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <av...@bluevoyant.com> wrote:
>>
>>> Hi Andrey ,
>>> I'm using IngestionTime
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>
>>> This is my timer in the processElement:
>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>
>>> The problem is how do I use it in my unit tests ? since there is no
>>> IngestionTime and timers are fired immediately so the timers actions (such
>>> as state cleanup) are fired before time and causing the tests to fail .
>>>
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <an...@ververica.com>
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> do you use processing time timer
>>>> (timerService().registerProcessingTimeTimer)?
>>>> why do you need ingestion time? do you
>>>> set TimeCharacteristic.IngestionTime?
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <av...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Our stream is not based on time sequence and we do not use time based
>>>>> operations. we do want to clean the state after x days hence we fire timer
>>>>> event. My problem is that our unit test fires the event immediately (there
>>>>> is no ingestion time) how can I inject ingestion time ?
>>>>>
>>>>> Cheers
>>>>> Avi
>>>>>
>>>>>

Re: ingesting time for TimeCharacteristic.IngestionTime on unit test

Posted by Andrey Zagrebin <an...@ververica.com>.
Hi Avi,

do you use processing time timer
(timerService().registerProcessingTimeTimer)?
why do you need ingestion time? do you set TimeCharacteristic.IngestionTime?

Best,
Andrey

On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <av...@bluevoyant.com> wrote:

> Hi,
> Our stream is not based on time sequence and we do not use time based
> operations. we do want to clean the state after x days hence we fire timer
> event. My problem is that our unit test fires the event immediately (there
> is no ingestion time) how can I inject ingestion time ?
>
> Cheers
> Avi
>
>