You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2018/08/16 12:15:00 UTC

[jira] [Commented] (FLINK-10155) JobExecutionException when using evictor followed with map

    [ https://issues.apache.org/jira/browse/FLINK-10155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16582422#comment-16582422 ] 

Aljoscha Krettek commented on FLINK-10155:
------------------------------------------

The underlying problem is that the {{WindowFunction}} directly emits the {{Iterable}} argument down to the next function. This has a Flink-internal implementation that is not necessarily good to serialize. Then, Flink by default copies elements when passing them between operations. If there is no known serializer for this we use Kryo and serialize and deserialize the object once. That's the exception you see thrown here.

One solution is to put the code that would be in the {{MapFunction}} directly into the {{WindowFunction}} or, if that doesn't work, to copy the contents of the {{Iterable}} into a good type that has a good serializer.

> JobExecutionException when using evictor followed with map
> ----------------------------------------------------------
>
>                 Key: FLINK-10155
>                 URL: https://issues.apache.org/jira/browse/FLINK-10155
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.6.0
>            Reporter: Prithvi Raj
>            Priority: Major
>
> The DataStream API encounters `JobExecutionException` when using a `map` after an event time `timeWindow` with an `evictor`. 
> The exception is thrown at `out.collect` on the `WindowFunction` and is not thrown when an `evictor` isn't used, or when not using event time semantics.
>  
> The exception is: 
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
> at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
> at com.uber.jaeger.dependencies.DependenciesProcessorTest.testMapAfterWindowing(DependenciesProcessorTest.java:101)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:89)
> at com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:86)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
> at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
> at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.onEventTime(EvictingWindowOperator.java:271)
> at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> ... 7 more
> Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> ... 22 more
> Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> strategies (org.apache.flink.api.common.state.StateTtlConfig$CleanupStrategies)
> cleanupStrategies (org.apache.flink.api.common.state.StateTtlConfig)
> ttlConfig (org.apache.flink.api.common.state.ListStateDescriptor)
> evictingWindowStateDescriptor (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator)
> this$0 (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$2)
> val$function (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
> val$fromIterable (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> ... 28 more
> Caused by: java.lang.NullPointerException
> at java.util.EnumMap$EnumMapIterator.hasNext(EnumMap.java:527)
> at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
> at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 50 more{code}
>  
> And the code is:
> {code:java}
> @Test
> public void testMapAfterWindowing() throws Exception {
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     DataStreamSource<String> stringStream = env.fromElements("this", "is", "some", "data", "tomfoolery");
>     SingleOutputStreamOperator<String> source = stringStream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
>     source.keyBy(new KeySelector<String, String>() {
>         @Override
>         public String getKey(String value) throws Exception {
>             return value.substring(0, 1);
>         }
>     })
>             .timeWindow(Time.milliseconds(100))
>             .evictor(CountEvictor.of(1))
>             .apply(new WindowFunction<String, Iterable<String>, String, TimeWindow>() {
>                 @Override
>                 public void apply(String s, TimeWindow window, Iterable<String> input, Collector<Iterable<String>> out) throws Exception {
>                     out.collect(input);
>                 }
>             })
>             .map(new RichMapFunction<Iterable<String>, Iterable<String>>() {
>                 // Identity map function
>                 @Override
>                 public Iterable<String> map(Iterable<String> value) throws Exception {
>                     return value;
>                 }
>             })
>             .printToErr();
>     env.execute();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)