You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Gyula Fora (JIRA)" <ji...@apache.org> on 2014/07/10 17:05:06 UTC

[jira] [Commented] (FLINK-1021) IllegalStateException at InputGate

    [ https://issues.apache.org/jira/browse/FLINK-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14057551#comment-14057551 ] 

Gyula Fora commented on FLINK-1021:
-----------------------------------

Also if using the same setup we run:

https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-examples/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java

We get errors during the emits:

java.lang.IllegalStateException: Pending serialization of previous record.
	at eu.stratosphere.runtime.io.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:62)
	at eu.stratosphere.runtime.io.api.RecordWriter.emit(RecordWriter.java:76)
	at eu.stratosphere.streaming.api.streamrecord.StreamCollector.emit(StreamCollector.java:75)
	at eu.stratosphere.streaming.api.streamrecord.StreamCollector.collect(StreamCollector.java:55)
	at eu.stratosphere.streaming.api.streamrecord.StreamCollectorManager.collect(StreamCollectorManager.java:72)
	at eu.stratosphere.streaming.api.streamrecord.StreamCollectorManager.collect(StreamCollectorManager.java:1)
	at eu.stratosphere.streaming.api.invokable.MapInvokable.invoke(MapInvokable.java:37)
	at eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:257)
	at eu.stratosphere.streaming.api.streamcomponent.StreamTask.invoke(StreamTask.java:102)
	at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
	at java.lang.Thread.run(Unknown Source)

These two issues might be linked.

> IllegalStateException at InputGate
> ----------------------------------
>
>                 Key: FLINK-1021
>                 URL: https://issues.apache.org/jira/browse/FLINK-1021
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 0.6-incubating
>         Environment: Linux/Windows
>            Reporter: Gyula Fora
>
> Until now, after every emit to the outputs we flushed them using the .flush() method of the recordwriter. Now we removed this flush() call and we have two interesting observations:
> First of all we dont send enough records the source finishes but the output buffer never gets flushed.
> Secondly if we generate a simple datastream from lets say the first 1500 numbers we get an exception in the InputGates (after lets say a hundred records): java.lang.IllegalStateException: Channel received an event before completing the current partial record.
> java.lang.IllegalStateException: Channel received an event before completing the current partial record.
> 	at eu.stratosphere.runtime.io.channels.InputChannel.readRecord(InputChannel.java:177)
> 	at eu.stratosphere.runtime.io.gates.InputGate.readRecord(InputGate.java:173)
> 	at eu.stratosphere.streaming.api.streamcomponent.StreamRecordReader.hasNext(StreamRecordReader.java:96)
> 	at eu.stratosphere.streaming.api.streamcomponent.AbstractStreamComponent.invokeRecords(AbstractStreamComponent.java:255)
> 	at eu.stratosphere.streaming.api.streamcomponent.StreamSink.invoke(StreamSink.java:74)
> 	at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:260)
> 	at java.lang.Thread.run(Unknown Source)
> To produce the error run this test: https://github.com/stratosphere/stratosphere-streaming/blob/output-flush/stratosphere-streaming-core/src/test/java/eu/stratosphere/streaming/api/PrintTest.java
> Please note that this is the output-flush branch in Stratoshpere-streaming
> This works perfectly if we flush the outputs after the emits.



--
This message was sent by Atlassian JIRA
(v6.2#6252)