You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chris Schneider <cs...@scaleunlimited.com> on 2018/04/19 01:26:58 UTC

Help with OneInputStreamOperatorTestHarness

Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
	at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
	... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


Re: Help with OneInputStreamOperatorTestHarness

Posted by Chris Schneider <cs...@scaleunlimited.com>.
Hi Fabian,

I created FLINK-9262 <https://issues.apache.org/jira/browse/FLINK-9262>.

FYI,

- Chris

> On Apr 26, 2018, at 3:07 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Thanks for reporting the issue Chris!
> Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?
> 
> Thank you, Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK <https://issues.apache.org/jira/browse/FLINK>
> 
> 2018-04-25 21:11 GMT+02:00 Chris Schneider <cschneider@scaleunlimited.com <ma...@scaleunlimited.com>>:
> Hi Gang,
> 
> FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80> to Flink 1.4.0, but that resulted in the same failure mode.
> 
> I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0.
> 
> FYI,
> 
> - Chris
> 
> 
>> On Apr 18, 2018, at 8:07 PM, Chris Schneider <CSchneider@scaleunlimited.com <ma...@scaleunlimited.com>> wrote:
>> 
>> Hi Ted,
>> 
>> I should have written that we’re using Flink 1.4.0.
>> 
>> Thanks for the suggestion re: FLINK-8268 <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the issue (though the pull request <https://github.com/apache/flink/pull/5193/files> appears fairly complex so I’ll need some time to study it).
>> 
>> Best Regards,
>> 
>> - Chris
>> 
>>> On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhihong@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Which release are you using ?
>>> 
>>> See if the work around from FLINK-8268 helps.
>>> 
>>> Cheers
>>> 
>>> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <cschneider@scaleunlimited.com <ma...@scaleunlimited.com>> wrote:
>>> Hi Gang,
>>> 
>>> I’m having trouble getting my streaming unit test to work. The following code:
>>> 
>>>     @Test
>>>     public void testDemo() throws Throwable {
>>>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
>>>             new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
>>>                 new StreamFlatMap<>(new DomainDBFunction()),
>>>                 new PldKeySelector<CrawlStateUrl>(),
>>>                 BasicTypeInfo.STRING_TYPE_INFO,
>>>                 1,
>>>                 1,
>>>                 0);
>>>         testHarness.setup();
>>>         testHarness.open();
>>> 
>>>         for (int i = 0; i < 10; i++) {
>>>             String urlString = String.format("https://domain-%d.com/page1 <https://domain-%d.com/page1>", i);
>>>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>>>             testHarness.processElement(new StreamRecord<>(url));
>>>         }
>>>         testHarness.snapshot(0L, 0L);
>>>     }
>>> 
>>> 
>>> Generates the following exception:
>>> 
>>> DomainDBFunctionTest.testDemo
>>> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
>>> java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>>> 	at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>> 	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>>> 	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
>>> Caused by: java.lang.NullPointerException
>>> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>>> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>>> 	... 26 more
>>> 
>>> I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.
>>> 
>>> Any advice would be most welcome.
>>> 
>>> Thanks,
>>> 
>>> - Chris
>>> 
>>> -----------------------------------------
>>> Chris Schneider
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> custom big data solutions
>>> -----------------------------------------
>>> 
>>> 
>> 
>> -----------------------------------------
>> Chris Schneider
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions
>> -----------------------------------------
>> 
> 
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions
> -----------------------------------------
> 
> 

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


Re: Help with OneInputStreamOperatorTestHarness

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for reporting the issue Chris!
Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?

Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK

2018-04-25 21:11 GMT+02:00 Chris Schneider <cs...@scaleunlimited.com>:

> Hi Gang,
>
> FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also
> tried cherry-picking the commit that fixed FLINK-8268
> <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80> to
> Flink 1.4.0, but that resulted in the same failure mode.
>
> I guess the takeaway is that this streaming test code harness support
> (which everyone should note is not yet part of the public Flink API) was
> apparently fragile in 1.4.0.
>
> FYI,
>
> - Chris
>
>
> On Apr 18, 2018, at 8:07 PM, Chris Schneider <
> CSchneider@scaleunlimited.com> wrote:
>
> Hi Ted,
>
> I should have written that we’re using Flink 1.4.0.
>
> Thanks for the suggestion re: FLINK-8268
> <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the
> issue (though the pull request
> <https://github.com/apache/flink/pull/5193/files> appears fairly complex
> so I’ll need some time to study it).
>
> Best Regards,
>
> - Chris
>
> On Apr 18, 2018, at 6:33 PM, Ted Yu <yu...@gmail.com> wrote:
>
> Which release are you using ?
>
> See if the work around from FLINK-8268 helps.
>
> Cheers
>
> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <
> cschneider@scaleunlimited.com> wrote:
>
>> Hi Gang,
>>
>> I’m having trouble getting my streaming unit test to work. The following
>> code:
>>
>>     @Test
>>     public void testDemo() throws Throwable {
>>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl>
>> testHarness =
>>             new KeyedOneInputStreamOperatorTestHarness<String,
>> CrawlStateUrl, CrawlStateUrl>(
>>                 new StreamFlatMap<>(new DomainDBFunction()),
>>                 new PldKeySelector<CrawlStateUrl>(),
>>                 BasicTypeInfo.STRING_TYPE_INFO,
>>                 1,
>>                 1,
>>                 0);
>>         testHarness.setup();
>>         testHarness.open();
>>
>>         for (int i = 0; i < 10; i++) {
>>             String urlString = String.format("https://domain-%d.com/page1
>> ", i);
>>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>>             testHarness.processElement(new StreamRecord<>(url));
>>         }
>>         testHarness.snapshot(0L, 0L);
>>     }
>>
>>
>> Generates the following exception:
>>
>> DomainDBFunctionTest.testDemo
>> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
>> java.lang.Exception: Could not complete snapshot 0 for operator MockTask
>> (1/1).
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.snapshotState(AbstractStreamOperator.java:379)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHa
>> rness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>> at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTe
>> st.testDemo(DomainDBFunctionTest.java:51)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>> FrameworkMethod.java:50)
>> at org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>> ectiveCallable.java:12)
>> at org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> ameworkMethod.java:47)
>> at org.junit.internal.runners.statements.InvokeMethod.evaluate(
>> InvokeMethod.java:17)
>> at org.junit.internal.runners.statements.RunBefores.evaluate(
>> RunBefores.java:26)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:78)
>> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:57)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.
>> run(JUnit4TestReference.java:50)
>> at org.eclipse.jdt.internal.junit.runner.TestExecution.run(
>> TestExecution.java:38)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTe
>> sts(RemoteTestRunner.java:459)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTe
>> sts(RemoteTestRunner.java:675)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(
>> RemoteTestRunner.java:382)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(
>> RemoteTestRunner.java:192)
>> Caused by: java.lang.NullPointerException
>> at org.apache.flink.util.Preconditions.checkNotNull(Preconditio
>> ns.java:58)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.snapshotState(AbstractUdfStreamOperator.java:90)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.snapshotState(AbstractStreamOperator.java:357)
>> ... 26 more
>>
>> I tried explicitly calling testHarness.setStateBackend(new
>> MemoryStateBackend()), but that didn’t seem to help. I could provide
>> more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl,
>> RawUrl, etc.), but that doesn’t seem like it would have much to do with the
>> problem.
>>
>> Any advice would be most welcome.
>>
>> Thanks,
>>
>> - Chris
>>
>> -----------------------------------------
>> Chris Schneider
>> http://www.scaleunlimited.com
>> custom big data solutions
>> -----------------------------------------
>>
>>
>
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com
> custom big data solutions
> -----------------------------------------
>
>
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com
> custom big data solutions
> -----------------------------------------
>
>

Re: Help with OneInputStreamOperatorTestHarness

Posted by Chris Schneider <cs...@scaleunlimited.com>.
Hi Gang,

FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80> to Flink 1.4.0, but that resulted in the same failure mode.

I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0.

FYI,

- Chris

> On Apr 18, 2018, at 8:07 PM, Chris Schneider <CS...@scaleunlimited.com> wrote:
> 
> Hi Ted,
> 
> I should have written that we’re using Flink 1.4.0.
> 
> Thanks for the suggestion re: FLINK-8268 <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the issue (though the pull request <https://github.com/apache/flink/pull/5193/files> appears fairly complex so I’ll need some time to study it).
> 
> Best Regards,
> 
> - Chris
> 
>> On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhihong@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Which release are you using ?
>> 
>> See if the work around from FLINK-8268 helps.
>> 
>> Cheers
>> 
>> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <cschneider@scaleunlimited.com <ma...@scaleunlimited.com>> wrote:
>> Hi Gang,
>> 
>> I’m having trouble getting my streaming unit test to work. The following code:
>> 
>>     @Test
>>     public void testDemo() throws Throwable {
>>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
>>             new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
>>                 new StreamFlatMap<>(new DomainDBFunction()),
>>                 new PldKeySelector<CrawlStateUrl>(),
>>                 BasicTypeInfo.STRING_TYPE_INFO,
>>                 1,
>>                 1,
>>                 0);
>>         testHarness.setup();
>>         testHarness.open();
>> 
>>         for (int i = 0; i < 10; i++) {
>>             String urlString = String.format("https://domain-%d.com/page1 <https://domain-%d.com/page1>", i);
>>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>>             testHarness.processElement(new StreamRecord<>(url));
>>         }
>>         testHarness.snapshot(0L, 0L);
>>     }
>> 
>> 
>> Generates the following exception:
>> 
>> DomainDBFunctionTest.testDemo
>> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
>> java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>> 	at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> 	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> 	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
>> Caused by: java.lang.NullPointerException
>> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>> 	... 26 more
>> 
>> I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.
>> 
>> Any advice would be most welcome.
>> 
>> Thanks,
>> 
>> - Chris
>> 
>> -----------------------------------------
>> Chris Schneider
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions
>> -----------------------------------------
>> 
>> 
> 
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions
> -----------------------------------------
> 

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


Re: Help with OneInputStreamOperatorTestHarness

Posted by Chris Schneider <cs...@scaleunlimited.com>.
Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268 <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the issue (though the pull request <https://github.com/apache/flink/pull/5193/files> appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

> On Apr 18, 2018, at 6:33 PM, Ted Yu <yu...@gmail.com> wrote:
> 
> Which release are you using ?
> 
> See if the work around from FLINK-8268 helps.
> 
> Cheers
> 
> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <cschneider@scaleunlimited.com <ma...@scaleunlimited.com>> wrote:
> Hi Gang,
> 
> I’m having trouble getting my streaming unit test to work. The following code:
> 
>     @Test
>     public void testDemo() throws Throwable {
>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
>             new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
>                 new StreamFlatMap<>(new DomainDBFunction()),
>                 new PldKeySelector<CrawlStateUrl>(),
>                 BasicTypeInfo.STRING_TYPE_INFO,
>                 1,
>                 1,
>                 0);
>         testHarness.setup();
>         testHarness.open();
> 
>         for (int i = 0; i < 10; i++) {
>             String urlString = String.format("https://domain-%d.com/page1 <https://domain-%d.com/page1>", i);
>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>             testHarness.processElement(new StreamRecord<>(url));
>         }
>         testHarness.snapshot(0L, 0L);
>     }
> 
> 
> Generates the following exception:
> 
> DomainDBFunctionTest.testDemo
> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
> 	at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> 	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> 	... 26 more
> 
> I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.
> 
> Any advice would be most welcome.
> 
> Thanks,
> 
> - Chris
> 
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions
> -----------------------------------------
> 
> 

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------