You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by criss <ct...@gmail.com> on 2016/11/16 09:26:55 UTC

Re: Error handling

Hi,

I have this, architecture: kafka topic -> flink kafka stream -> flink custom
sink to save data in a Postgresql database.
For testing how the system will behave if an error occurs, I've done the
following test: 
Activate checkpoints on my DataStream and put on kafka topic one item with
special value on some field and throw an error when processing that item.
What have I discovered:
- When the error is thrown inside DeserializationSchema implementation
everything is fine, the job is recovered as it says in documentation.
- BUT when the error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items are
processed even if the kafka consumer is working fine. Is this normal?
Here are some logs:
2016-11-15 17:44:20,000 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
2016-11-15 17:44:20,036 INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task                    
- Task execution failed. 
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error handling

Posted by Aljoscha Krettek <al...@apache.org>.
Hmm, I still don't know what could be causing this. Which version of Flink
are you using? Also, when you say
"BUT when the error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items are
processed even if the kafka consumer is working fine. "

you mean that the job will simply stop and not try restarting, right? Have
you set anything as the restarting strategy?

Cheers,
Aljoscha

On Thu, 17 Nov 2016 at 09:48 criss <ct...@gmail.com> wrote:

> Hi,
>
> Here is the code which triggers the error(part of sink):
> @Override
> public void invoke(KafkaLog value) throws Exception {
>         ......................
>         if (arg instanceof String && "error".equals((String)arg)) {
>                 throw new IOException("search for error");
>         }
>         ...........................
> }
>
> And here's the entire stack trace that I have from log file:
>
> 2016-11-15 17:44:20,000 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
> exception while processing timer.
> java.lang.RuntimeException: Could not forward element to next operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: search for error
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>         ... 18 more
> 2016-11-15 17:44:20,036 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
> service is shutting down.
> 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> TimerException{java.lang.RuntimeException: Could not forward element to
> next
> operator}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
>         ... 7 more
> Caused by: java.io.IOException: search for error
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>         ... 18 more
> 2016-11-15 17:44:20,037 INFO  org.apache.flink.runtime.taskmanager.Task
> - TriggerWindow(TumblingProcessingTimeWindows(10000),
>
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df
> },
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: Unnamed (1/2) switched to FAILED with exception.
> TimerException{java.lang.RuntimeException: Could not forward element to
> next
> operator}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
>         at
>
> mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
>         at
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
>         ... 7 more
> Caused by: java.io.IOException: search for error
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
>         at
>
> mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
>         ... 18 more
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Error handling

Posted by criss <ct...@gmail.com>.
Hi,

Here is the code which triggers the error(part of sink):
@Override
public void invoke(KafkaLog value) throws Exception {
        ......................
	if (arg instanceof String && "error".equals((String)arg)) {
		throw new IOException("search for error");
	}
        ...........................
}

And here's the entire stack trace that I have from log file:

2016-11-15 17:44:20,000 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
	at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
	at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
	at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
	at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
	at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: search for error
	at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
	at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
	at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
	... 18 more
2016-11-15 17:44:20,036 INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task                    
- Task execution failed. 
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
	at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
	at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
	at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
	at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
	at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
	... 7 more
Caused by: java.io.IOException: search for error
	at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
	at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
	at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
	... 18 more
2016-11-15 17:44:20,037 INFO  org.apache.flink.runtime.taskmanager.Task                    
- TriggerWindow(TumblingProcessingTimeWindows(10000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
Sink: Unnamed (1/2) switched to FAILED with exception.
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56)
	at
mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50)
	at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50)
	at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36)
	at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543)
	at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796)
	... 7 more
Caused by: java.io.IOException: search for error
	at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48)
	at
mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23)
	at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
	... 18 more





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error handling

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
is that the complete stack trace or is there more to it? I cannot really
see where the exception originates.

Cheers,
Aljoscha

On Wed, 16 Nov 2016 at 10:38 criss <ct...@gmail.com> wrote:

> Hi,
>
> I have this, architecture: kafka topic -> flink kafka stream -> flink
> custom
> sink to save data in a Postgresql database.
> For testing how the system will behave if an error occurs, I've done the
> following test:
> Activate checkpoints on my DataStream and put on kafka topic one item with
> special value on some field and throw an error when processing that item.
> What have I discovered:
> - When the error is thrown inside DeserializationSchema implementation
> everything is fine, the job is recovered as it says in documentation.
> - BUT when the error is thrown inside invoke implementation from
> RichSinkFunction, the recovery is not done and also no further items are
> processed even if the kafka consumer is working fine. Is this normal?
> Here are some logs:
> 2016-11-15 17:44:20,000 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
> exception while processing timer.
> java.lang.RuntimeException: Could not forward element to next operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
> 2016-11-15 17:44:20,036 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
> service is shutting down.
> 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task
> - Task execution failed.
> TimerException{java.lang.RuntimeException: Could not forward element to
> next
> operator}
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next
> operator
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10141.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>