You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2018/08/15 22:24:50 UTC

OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.

Thanks,

— Ken

==================================================================================================
TimerTest.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    @Before
    public void setUp() throws Exception {
    }
    
    @Test
    public void testTimerSaving() throws Throwable {
        
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        
        // We begin at time zero
        testHarness.setProcessingTime(0);

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        }
        
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        
        // Close the first test harness
        testHarness.close();
        
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        testHarness.close();
        for (long i = 1; i < 20; i++) {
            
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.
            assertTrue(_firedTimers.contains(i));
        }
    }

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    operator,
                    new TimerTool.IdentityKeySelector<Integer>(),
                    BasicTypeInfo.INT_TYPE_INFO);
        result.setup();
        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }
        return result;
    }
}


==================================================================================================
TimerFunction.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
   
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);
    }

    public TimerFunction(List<Long> firedTimers, long period) {
        super();
        _firedTimers = firedTimers;
        _period = period;
    }

    @Override
    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        _firedTimers.add(timestamp);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",
                    timestamp,
                    nextTimestamp);
        context.timerService().registerProcessingTimeTimer(nextTimestamp);
    }

    @Override
    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",
                        firstTimestamp);
            context.timerService().registerProcessingTimeTimer(firstTimestamp);
        }
        
        out.collect(input);
    }
}



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Posted by Piotr Nowojski <pi...@data-artisans.com>.
No problem :) You motivated me to do a fix for that, since I stumbled across this bug/issue myself before and also took me some time in the debugger to find the cause.

Piotrek

> On 16 Aug 2018, at 20:05, Ken Krugler <kk...@transpac.com> wrote:
> 
> Hi Piotr,
> 
> Thanks, and darn it that’s something I should have noticed.
> 
> — Ken
> 
> 
>> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:
>> 
>>         result.open();
>>         if (savedState != null) {
>>             result.initializeState(savedState);
>>         }
>> 
>> Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.
>> 
>> Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159 <https://issues.apache.org/jira/browse/FLINK-10159>
>> 
>> Piotrek
>> 
>>> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.
>>> 
>>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>>> 
>>> If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?
>>> 
>>> Below is the unit test, which shows how the test harness is being set up and run.
>>> 
>>> The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>>> ==================================================================================================
>>> TimerTest.java
>>> ==================================================================================================
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import static org.junit.Assert.assertTrue;
>>> 
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> 
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>>> 
>>> public class TimerTest {
>>>     public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);
>>> 
>>>     private List<Long> _firedTimers = new ArrayList<Long>();
>>> 
>>>     @Before
>>>     public void setUp() throws Exception {
>>>     }
>>>     
>>>     @Test
>>>     public void testTimerSaving() throws Throwable {
>>>         
>>>         // This operator doesn't really do much at all, but the first element
>>>         // it processes will create a timer for (timestamp+1).
>>>         // Whenever that timer fires, it will create another timer for 
>>>         // (timestamp+1).
>>>         KeyedProcessOperator<Integer, Integer, Integer> operator = 
>>>             new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>>         
>>>         // Create a test harness from scratch
>>>         OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
>>>             makeTestHarness(operator, null);
>>>         
>>>         // We begin at time zero
>>>         testHarness.setProcessingTime(0);
>>> 
>>>         // Process some elements, which should also create a timer for time 1.
>>>         int inputs[] = new int[] {1, 2, 3};
>>>         for (int input : inputs) {
>>>             testHarness.processElement(new StreamRecord<>(input));
>>>         }
>>>         
>>>         // Force some time to pass, which should keep moving the timer ahead,
>>>         // finally leaving it set for time 10.
>>>         for (long i = 1; i < 10; i++) {
>>>             testHarness.setProcessingTime(i);
>>>         }
>>>         
>>>         // Save the state, which we assume should include the timer we set for
>>>         // time 10.
>>>         OperatorSubtaskState savedState = 
>>>             testHarness.snapshot(0L, testHarness.getProcessingTime());
>>>         
>>>         // Close the first test harness
>>>         testHarness.close();
>>>         
>>>         // Create a new test harness using the saved state (which we assume
>>>         // includes the timer for time 10).
>>>         testHarness = makeTestHarness(operator, savedState);
>>>         
>>>         // Force more time to pass, which should keep moving the timer ahead.
>>>         for (long i = 10; i < 20; i++) {
>>>             testHarness.setProcessingTime(i);
>>>         }
>>>         
>>>         // Close the second test harness and make sure all the timers we expect
>>>         // actually fired.
>>>         testHarness.close();
>>>         for (long i = 1; i < 20; i++) {
>>>             
>>>             // TODO This expectation currently fails, since Timers don't
>>>             // seem to be included in the snapshot, at least the one produced by
>>>             // the test harness.
>>>             assertTrue(_firedTimers.contains(i));
>>>         }
>>>     }
>>> 
>>>     private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
>>>             KeyedProcessOperator<Integer, Integer, Integer> operator,
>>>             OperatorSubtaskState savedState) 
>>>             throws Exception {
>>>         OneInputStreamOperatorTestHarness<Integer, Integer> result;
>>>         result = 
>>>             new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
>>>                     operator,
>>>                     new TimerTool.IdentityKeySelector<Integer>(),
>>>                     BasicTypeInfo.INT_TYPE_INFO);
>>>         result.setup();
>>>         result.open();
>>>         if (savedState != null) {
>>>             result.initializeState(savedState);
>>>         }
>>>         return result;
>>>     }
>>> }
>>> 
>>> 
>>> ==================================================================================================
>>> TimerFunction.java
>>> ==================================================================================================
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import java.util.List;
>>> 
>>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>>> import org.apache.flink.util.Collector;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> @SuppressWarnings("serial")
>>> public class TimerFunction
>>>     extends KeyedProcessFunction<Integer, Integer, Integer> {
>>>     static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
>>>    
>>>     List<Long> _firedTimers;
>>>     long _period;
>>> 
>>>     public TimerFunction(List<Long> firedTimers) {
>>>         this(firedTimers, 1);
>>>     }
>>> 
>>>     public TimerFunction(List<Long> firedTimers, long period) {
>>>         super();
>>>         _firedTimers = firedTimers;
>>>         _period = period;
>>>     }
>>> 
>>>     @Override
>>>     public void onTimer(long timestamp,
>>>                         KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
>>>                         Collector<Integer> out) throws Exception {
>>>         super.onTimer(timestamp, context, out);
>>>         _firedTimers.add(timestamp);
>>>         long nextTimestamp = timestamp + _period;
>>>         LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer for {}",
>>>                     timestamp,
>>>                     nextTimestamp);
>>>         context.timerService().registerProcessingTimeTimer(nextTimestamp);
>>>     }
>>> 
>>>     @Override
>>>     public void processElement( Integer input,
>>>                                 KeyedProcessFunction<Integer, Integer, Integer>.Context context,
>>>                                 Collector<Integer> out)
>>>         throws Exception {
>>>         
>>>         LOGGER.info <http://logger.info/>("Processing input {}", input);
>>>         if (_firedTimers.isEmpty()) {
>>>             long firstTimestamp = 
>>>                 context.timerService().currentProcessingTime() + _period;
>>>             LOGGER.info <http://logger.info/>("Setting initial timer for {}",
>>>                         firstTimestamp);
>>>             context.timerService().registerProcessingTimeTimer(firstTimestamp);
>>>         }
>>>         
>>>         out.collect(input);
>>>     }
>>> }
>>> 
>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 


Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Posted by Ken Krugler <kk...@transpac.com>.
Hi Piotr,

Thanks, and darn it that’s something I should have noticed.

— Ken


> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:
> 
>         result.open();
>         if (savedState != null) {
>             result.initializeState(savedState);
>         }
> 
> Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.
> 
> Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159 <https://issues.apache.org/jira/browse/FLINK-10159>
> 
> Piotrek
> 
>> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_lists@transpac.com <ma...@transpac.com>> wrote:
>> 
>> Hi all,
>> 
>> It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.
>> 
>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>> 
>> If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?
>> 
>> Below is the unit test, which shows how the test harness is being set up and run.
>> 
>> The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> ==================================================================================================
>> TimerTest.java
>> ==================================================================================================
>> package com.scaleunlimited.flinkcrawler.functions;
>> 
>> import static org.junit.Assert.assertTrue;
>> 
>> import java.util.ArrayList;
>> import java.util.List;
>> 
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>> import org.junit.Before;
>> import org.junit.Test;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>> 
>> public class TimerTest {
>>     public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);
>> 
>>     private List<Long> _firedTimers = new ArrayList<Long>();
>> 
>>     @Before
>>     public void setUp() throws Exception {
>>     }
>>     
>>     @Test
>>     public void testTimerSaving() throws Throwable {
>>         
>>         // This operator doesn't really do much at all, but the first element
>>         // it processes will create a timer for (timestamp+1).
>>         // Whenever that timer fires, it will create another timer for 
>>         // (timestamp+1).
>>         KeyedProcessOperator<Integer, Integer, Integer> operator = 
>>             new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>         
>>         // Create a test harness from scratch
>>         OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
>>             makeTestHarness(operator, null);
>>         
>>         // We begin at time zero
>>         testHarness.setProcessingTime(0);
>> 
>>         // Process some elements, which should also create a timer for time 1.
>>         int inputs[] = new int[] {1, 2, 3};
>>         for (int input : inputs) {
>>             testHarness.processElement(new StreamRecord<>(input));
>>         }
>>         
>>         // Force some time to pass, which should keep moving the timer ahead,
>>         // finally leaving it set for time 10.
>>         for (long i = 1; i < 10; i++) {
>>             testHarness.setProcessingTime(i);
>>         }
>>         
>>         // Save the state, which we assume should include the timer we set for
>>         // time 10.
>>         OperatorSubtaskState savedState = 
>>             testHarness.snapshot(0L, testHarness.getProcessingTime());
>>         
>>         // Close the first test harness
>>         testHarness.close();
>>         
>>         // Create a new test harness using the saved state (which we assume
>>         // includes the timer for time 10).
>>         testHarness = makeTestHarness(operator, savedState);
>>         
>>         // Force more time to pass, which should keep moving the timer ahead.
>>         for (long i = 10; i < 20; i++) {
>>             testHarness.setProcessingTime(i);
>>         }
>>         
>>         // Close the second test harness and make sure all the timers we expect
>>         // actually fired.
>>         testHarness.close();
>>         for (long i = 1; i < 20; i++) {
>>             
>>             // TODO This expectation currently fails, since Timers don't
>>             // seem to be included in the snapshot, at least the one produced by
>>             // the test harness.
>>             assertTrue(_firedTimers.contains(i));
>>         }
>>     }
>> 
>>     private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
>>             KeyedProcessOperator<Integer, Integer, Integer> operator,
>>             OperatorSubtaskState savedState) 
>>             throws Exception {
>>         OneInputStreamOperatorTestHarness<Integer, Integer> result;
>>         result = 
>>             new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
>>                     operator,
>>                     new TimerTool.IdentityKeySelector<Integer>(),
>>                     BasicTypeInfo.INT_TYPE_INFO);
>>         result.setup();
>>         result.open();
>>         if (savedState != null) {
>>             result.initializeState(savedState);
>>         }
>>         return result;
>>     }
>> }
>> 
>> 
>> ==================================================================================================
>> TimerFunction.java
>> ==================================================================================================
>> package com.scaleunlimited.flinkcrawler.functions;
>> 
>> import java.util.List;
>> 
>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>> import org.apache.flink.util.Collector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> @SuppressWarnings("serial")
>> public class TimerFunction
>>     extends KeyedProcessFunction<Integer, Integer, Integer> {
>>     static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
>>    
>>     List<Long> _firedTimers;
>>     long _period;
>> 
>>     public TimerFunction(List<Long> firedTimers) {
>>         this(firedTimers, 1);
>>     }
>> 
>>     public TimerFunction(List<Long> firedTimers, long period) {
>>         super();
>>         _firedTimers = firedTimers;
>>         _period = period;
>>     }
>> 
>>     @Override
>>     public void onTimer(long timestamp,
>>                         KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
>>                         Collector<Integer> out) throws Exception {
>>         super.onTimer(timestamp, context, out);
>>         _firedTimers.add(timestamp);
>>         long nextTimestamp = timestamp + _period;
>>         LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer for {}",
>>                     timestamp,
>>                     nextTimestamp);
>>         context.timerService().registerProcessingTimeTimer(nextTimestamp);
>>     }
>> 
>>     @Override
>>     public void processElement( Integer input,
>>                                 KeyedProcessFunction<Integer, Integer, Integer>.Context context,
>>                                 Collector<Integer> out)
>>         throws Exception {
>>         
>>         LOGGER.info <http://logger.info/>("Processing input {}", input);
>>         if (_firedTimers.isEmpty()) {
>>             long firstTimestamp = 
>>                 context.timerService().currentProcessingTime() + _period;
>>             LOGGER.info <http://logger.info/>("Setting initial timer for {}",
>>                         firstTimestamp);
>>             context.timerService().registerProcessingTimeTimer(firstTimestamp);
>>         }
>>         
>>         out.collect(input);
>>     }
>> }
>> 
>> 
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:

        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }

Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159 <https://issues.apache.org/jira/browse/FLINK-10159>

Piotrek

> On 16 Aug 2018, at 00:24, Ken Krugler <kk...@transpac.com> wrote:
> 
> Hi all,
> 
> It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.
> 
> Is this a known limitation of OneInputStreamOperatorTestHarness?
> 
> If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?
> 
> Below is the unit test, which shows how the test harness is being set up and run.
> 
> The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.
> 
> Thanks,
> 
> — Ken
> 
> ==================================================================================================
> TimerTest.java
> ==================================================================================================
> package com.scaleunlimited.flinkcrawler.functions;
> 
> import static org.junit.Assert.assertTrue;
> 
> import java.util.ArrayList;
> import java.util.List;
> 
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
> import org.junit.Before;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
> 
> public class TimerTest {
>     public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);
> 
>     private List<Long> _firedTimers = new ArrayList<Long>();
> 
>     @Before
>     public void setUp() throws Exception {
>     }
>     
>     @Test
>     public void testTimerSaving() throws Throwable {
>         
>         // This operator doesn't really do much at all, but the first element
>         // it processes will create a timer for (timestamp+1).
>         // Whenever that timer fires, it will create another timer for 
>         // (timestamp+1).
>         KeyedProcessOperator<Integer, Integer, Integer> operator = 
>             new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>         
>         // Create a test harness from scratch
>         OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
>             makeTestHarness(operator, null);
>         
>         // We begin at time zero
>         testHarness.setProcessingTime(0);
> 
>         // Process some elements, which should also create a timer for time 1.
>         int inputs[] = new int[] {1, 2, 3};
>         for (int input : inputs) {
>             testHarness.processElement(new StreamRecord<>(input));
>         }
>         
>         // Force some time to pass, which should keep moving the timer ahead,
>         // finally leaving it set for time 10.
>         for (long i = 1; i < 10; i++) {
>             testHarness.setProcessingTime(i);
>         }
>         
>         // Save the state, which we assume should include the timer we set for
>         // time 10.
>         OperatorSubtaskState savedState = 
>             testHarness.snapshot(0L, testHarness.getProcessingTime());
>         
>         // Close the first test harness
>         testHarness.close();
>         
>         // Create a new test harness using the saved state (which we assume
>         // includes the timer for time 10).
>         testHarness = makeTestHarness(operator, savedState);
>         
>         // Force more time to pass, which should keep moving the timer ahead.
>         for (long i = 10; i < 20; i++) {
>             testHarness.setProcessingTime(i);
>         }
>         
>         // Close the second test harness and make sure all the timers we expect
>         // actually fired.
>         testHarness.close();
>         for (long i = 1; i < 20; i++) {
>             
>             // TODO This expectation currently fails, since Timers don't
>             // seem to be included in the snapshot, at least the one produced by
>             // the test harness.
>             assertTrue(_firedTimers.contains(i));
>         }
>     }
> 
>     private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
>             KeyedProcessOperator<Integer, Integer, Integer> operator,
>             OperatorSubtaskState savedState) 
>             throws Exception {
>         OneInputStreamOperatorTestHarness<Integer, Integer> result;
>         result = 
>             new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
>                     operator,
>                     new TimerTool.IdentityKeySelector<Integer>(),
>                     BasicTypeInfo.INT_TYPE_INFO);
>         result.setup();
>         result.open();
>         if (savedState != null) {
>             result.initializeState(savedState);
>         }
>         return result;
>     }
> }
> 
> 
> ==================================================================================================
> TimerFunction.java
> ==================================================================================================
> package com.scaleunlimited.flinkcrawler.functions;
> 
> import java.util.List;
> 
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> @SuppressWarnings("serial")
> public class TimerFunction
>     extends KeyedProcessFunction<Integer, Integer, Integer> {
>     static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
>    
>     List<Long> _firedTimers;
>     long _period;
> 
>     public TimerFunction(List<Long> firedTimers) {
>         this(firedTimers, 1);
>     }
> 
>     public TimerFunction(List<Long> firedTimers, long period) {
>         super();
>         _firedTimers = firedTimers;
>         _period = period;
>     }
> 
>     @Override
>     public void onTimer(long timestamp,
>                         KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
>                         Collector<Integer> out) throws Exception {
>         super.onTimer(timestamp, context, out);
>         _firedTimers.add(timestamp);
>         long nextTimestamp = timestamp + _period;
>         LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer for {}",
>                     timestamp,
>                     nextTimestamp);
>         context.timerService().registerProcessingTimeTimer(nextTimestamp);
>     }
> 
>     @Override
>     public void processElement( Integer input,
>                                 KeyedProcessFunction<Integer, Integer, Integer>.Context context,
>                                 Collector<Integer> out)
>         throws Exception {
>         
>         LOGGER.info <http://logger.info/>("Processing input {}", input);
>         if (_firedTimers.isEmpty()) {
>             long firstTimestamp = 
>                 context.timerService().currentProcessingTime() + _period;
>             LOGGER.info <http://logger.info/>("Setting initial timer for {}",
>                         firstTimestamp);
>             context.timerService().registerProcessingTimeTimer(firstTimestamp);
>         }
>         
>         out.collect(input);
>     }
> }
> 
> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>