You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chris Schneider (JIRA)" <ji...@apache.org> on 2018/06/06 23:38:00 UTC

[jira] [Commented] (FLINK-9262) KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot

    [ https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504052#comment-16504052 ] 

Chris Schneider commented on FLINK-9262:
----------------------------------------

Hi Aljoscha,

I'm still able to reproduce this problem myself using Flink 1.4-SNAPSHOT (at 90acd78), but I'm not certain that the method I chose for doing so was correct:
 # I pulled the 1.4 branch to my local flink source directory (at 90acd78).
 # I deleted the entire flink directory from my local Maven repo.
 # I ran {{mvn install -DskipTests}} from my flink source directory (and checked the output to make sure it wasn't downloading any Flink artifacts with version 1.4-SNAPSHOT).
 # I modified the pom.xml in my own source project to depend on the 1.4-SNAPSHOT version of all Flink artifacts (and to avoid using apache.snapshots repository).
 # I ran {{FlinkIssueTest}} within my own source project.

Unless you see an error in the approach I've described above, I'm inclined to re-open this issue.

Please let me know what you think,

Chris

> KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
> -------------------------------------------------------------------
>
>                 Key: FLINK-9262
>                 URL: https://issues.apache.org/jira/browse/FLINK-9262
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming, Tests
>    Affects Versions: 1.4.0
>         Environment: macOS X High Sierra 10.13.4
> (ancient) Eclipse Luna v.4.4.1 
> JRE System Library [Java SE 8 [1.8.0_131]]
> Java 8 Update 171 build 11
>            Reporter: Chris Schneider
>            Priority: Blocker
>
> Although KeyedOneInputStreamOperatorTestHarness and other AbstractStreamOperatorTestHarness subclasses are not yet part of the public Flink API, we have been trying to make use of them for unit testing our map functions. The following code throws NPE from the attempt to collect a snapshot on Flink 1.4.0 (even after applying [the fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80] for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
> {code:java}
> package com.scaleunlimited.flinkcrawler.functions;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.operators.StreamFlatMap;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.util.Collector;
> import org.junit.Test;
> public class FlinkIssueTest {
>     
>     @SuppressWarnings("serial")
>     private static class MyProcessFunction extends RichFlatMapFunction<String, String> {
>         @Override
>         public void flatMap(String input, Collector<String> collector) throws Exception {
>             collector.collect(input);
>         }
>     }
>     
>     @SuppressWarnings({
>             "serial", "hiding"
>     })
>     private static class MyKeySelector<String> implements KeySelector<String, String> {
>         @Override
>         public String getKey(String input) throws Exception {
>             return input;
>         }
>     }
>     @Test
>     public void test() throws Throwable {
>         KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness =
>             new KeyedOneInputStreamOperatorTestHarness<String, String, String>(
>                 new StreamFlatMap<>(new MyProcessFunction()),
>                 new MyKeySelector<String>(),
>                 BasicTypeInfo.STRING_TYPE_INFO,
>                 1,
>                 1,
>                 0);
>         testHarness.setup();
>         testHarness.open();
>         
>         for (int i = 0; i < 10; i++) {
>             String urlString = String.format("https://domain-%d.com/page1", i);
>             testHarness.processElement(new StreamRecord<>(urlString));
>         }
>         testHarness.snapshot(0L, 0L);
>     }
> }
> {code}
> Output:
> {noformat}
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>     at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>     at com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>     at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>     at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>     at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>     ... 25 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)