You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Matthias J. Sax" <mj...@informatik.hu-berlin.de> on 2015/03/27 22:07:25 UTC

Question about Infinite Streaming Job on Mini Cluster and ITCase

Hi,

I am trying to run an infinite streaming job (ie, one that does not
terminate because it is generating output date randomly on the fly). I
kill this job with .stop() or .shutdown() method of
ForkableFlinkMiniCluster.

I did not find any example using a similar setup. In the provided
examples, each job terminate automatically, because only a finite input
is processed and the source returns after all data is emitted.


I have multiple question about my setup:

 1) The job never terminates "clean", ie, I get some exceptions. Is this
behavior desired?

 2) Is it possible to get a result back? Similar to
JobClient.submitJobAndWait(...)?

 3) Is it somehow possible, to send a signal to the running job such
that the source can terminate regularly as if finite input would be
processed? Right now, I use an while(running) loop and set 'running' to
false in the .cancel() method.



Thanks for your help!

-Matthias



Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
I agree.

@Marton:
The idea with the extra thread does not work, because the method
JobClient.submitJobAndWait(...) does not return regularly if
ForkableFlinkMiniCluster.shutdown() is called -- instead an exception
occurs:

> Exception in thread "Thread-8" java.lang.RuntimeException: org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job manager.
> 	at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:119)
> Caused by: org.apache.flink.runtime.client.JobTimeoutException: Lost connection to job manager.
> 	at org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:228)
> 	at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.scala)
> 	at org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:117)
> Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://flink/user/jobclient#-596117797]] had already been terminated.
> 	at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
> 	at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144)
> 	at org.apache.flink.runtime.client.JobClient$.submitJobAndWait(JobClient.scala:222)
> 	... 2 more


Thus, I cannot get an JobExecutionResult this way, either.


-Matthias


On 04/01/2015 02:36 PM, Stephan Ewen wrote:
> As a followup - I think it would be a good thing to add a way to gracefully
> stop a streaming job.
> 
> Something that sends "close" to the sources, and they quit.
> 
> We can use this for graceful shutdown wen re-partitioninig / scaling in or
> out, ...
> 
> On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
> 
>> Hi,
>>
>> I will pull the fix and try it out.
>>
>> Thanks for the hint with the extra Thread. That should work for me. But
>> you are actually right; my setup is Storm inspired. I thinks its a very
>> natural way to deploy and stop and infinite streaming job. Maybe, you
>> want to adopt to it.
>>
>> The ITCase I am writing bases on StreamingProgramTestBase, so I need the
>> JobExecutionResult because the test fails without it.
>>
>>
>> -Matthias
>>
>>
>>
>> On 04/01/2015 11:09 AM, Márton Balassi wrote:
>>> Hey Matthias,
>>>
>>> Thanks for reporting the Exception thrown, we were not preparing for this
>>> use case yet. We fixed it with Gyula, he is pushing a fix for it right
>> now:
>>> When the job is cancelled (for example due to shutting down the executor
>>> underneath) you should not see that InterruptedException as soon as this
>>> commit is in. [1]
>>>
>>> As for getting the streaming JobExecutionResult back from a detached job
>> my
>>> current best practice is what you can see in
>>> the ProcessFailureRecoveryTestBase and its streaming implementation:
>>> starting an executor in a separate thread and then joining it with the
>> main
>>> one. Would you prefer a more Storm example-ish solution? [2]
>>>
>>> [1] https://github.com/mbalassi/flink/commit/5db06d6d
>>> [2]
>>>
>> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
>>>
>>> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
>>> mjsax@informatik.hu-berlin.de> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> thanks for your answer.
>>>>
>>>> I get an InterruptedException when I call shutdown():
>>>>
>>>> java.lang.InterruptedException
>>>>         at java.lang.Object.wait(Native Method)
>>>>         at java.lang.Thread.join(Thread.java:1225)
>>>>         at java.lang.Thread.join(Thread.java:1278)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>>>>         at
>>>>
>>>>
>> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>>>>         at
>>>>
>>>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>         at java.lang.Thread.run(Thread.java:701)
>>>>
>>>>
>>>> About the JobExecutionResult:
>>>>
>>>> I added a new method to the API, that calls
>>>> JobClient.submitJobDetached(...) instead of
>>>> JobClient.submitJobAndWait(...). The "detached" version has no return
>>>> value, while the blocking one returns a JobExecutionResult that is
>>>> further returned by execute(). So I cannot get a JobExecutionResult
>>>> right now.
>>>>
>>>> It would be nice to get the JobExecutionResult when stopping the running
>>>> program via a "stop-execution"-call (is there any way to do this?).
>>>> Right now, I sleep for a certain time after calling
>>>> submitJobDetached(...) an call stop() and shutdown() later on (from
>>>> ForkableMiniCluster). The stop() call does not seem to do anything...
>>>> shutdown() works (except for the Exception I get -- as described above).
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 03/30/2015 09:08 PM, Robert Metzger wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> the streaming folks can probably answer the questions better. But I'll
>>>>> write something to bring this message back to their attention ;)
>>>>>
>>>>> 1) Which exceptions are you seeing? Flink should be able to cleanly
>> shut
>>>>> down.
>>>>> 2) As far as I saw it, the execute() method (of the Streaming API) got
>> an
>>>>> JobExecutionResult return type in the latest master. That contains
>>>>> accumulator results.
>>>>> 3) I think the cancel() method is there for exactly that purpose. If
>> the
>>>>> job is shutting down before the cancel method, that probably a bug.
>>>>>
>>>>>
>>>>> Robert
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
>>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to run an infinite streaming job (ie, one that does not
>>>>>> terminate because it is generating output date randomly on the fly). I
>>>>>> kill this job with .stop() or .shutdown() method of
>>>>>> ForkableFlinkMiniCluster.
>>>>>>
>>>>>> I did not find any example using a similar setup. In the provided
>>>>>> examples, each job terminate automatically, because only a finite
>> input
>>>>>> is processed and the source returns after all data is emitted.
>>>>>>
>>>>>>
>>>>>> I have multiple question about my setup:
>>>>>>
>>>>>>  1) The job never terminates "clean", ie, I get some exceptions. Is
>> this
>>>>>> behavior desired?
>>>>>>
>>>>>>  2) Is it possible to get a result back? Similar to
>>>>>> JobClient.submitJobAndWait(...)?
>>>>>>
>>>>>>  3) Is it somehow possible, to send a signal to the running job such
>>>>>> that the source can terminate regularly as if finite input would be
>>>>>> processed? Right now, I use an while(running) loop and set 'running'
>> to
>>>>>> false in the .cancel() method.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks for your help!
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Posted by Stephan Ewen <se...@apache.org>.
As a followup - I think it would be a good thing to add a way to gracefully
stop a streaming job.

Something that sends "close" to the sources, and they quit.

We can use this for graceful shutdown wen re-partitioninig / scaling in or
out, ...

On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Hi,
>
> I will pull the fix and try it out.
>
> Thanks for the hint with the extra Thread. That should work for me. But
> you are actually right; my setup is Storm inspired. I thinks its a very
> natural way to deploy and stop and infinite streaming job. Maybe, you
> want to adopt to it.
>
> The ITCase I am writing bases on StreamingProgramTestBase, so I need the
> JobExecutionResult because the test fails without it.
>
>
> -Matthias
>
>
>
> On 04/01/2015 11:09 AM, Márton Balassi wrote:
> > Hey Matthias,
> >
> > Thanks for reporting the Exception thrown, we were not preparing for this
> > use case yet. We fixed it with Gyula, he is pushing a fix for it right
> now:
> > When the job is cancelled (for example due to shutting down the executor
> > underneath) you should not see that InterruptedException as soon as this
> > commit is in. [1]
> >
> > As for getting the streaming JobExecutionResult back from a detached job
> my
> > current best practice is what you can see in
> > the ProcessFailureRecoveryTestBase and its streaming implementation:
> > starting an executor in a separate thread and then joining it with the
> main
> > one. Would you prefer a more Storm example-ish solution? [2]
> >
> > [1] https://github.com/mbalassi/flink/commit/5db06d6d
> > [2]
> >
> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
> >
> > On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
> > mjsax@informatik.hu-berlin.de> wrote:
> >
> >> Hi Robert,
> >>
> >> thanks for your answer.
> >>
> >> I get an InterruptedException when I call shutdown():
> >>
> >> java.lang.InterruptedException
> >>         at java.lang.Object.wait(Native Method)
> >>         at java.lang.Thread.join(Thread.java:1225)
> >>         at java.lang.Thread.join(Thread.java:1278)
> >>         at
> >>
> >>
> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
> >>         at
> >>
> >>
> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
> >>         at
> >>
> >>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> >>         at java.lang.Thread.run(Thread.java:701)
> >>
> >>
> >> About the JobExecutionResult:
> >>
> >> I added a new method to the API, that calls
> >> JobClient.submitJobDetached(...) instead of
> >> JobClient.submitJobAndWait(...). The "detached" version has no return
> >> value, while the blocking one returns a JobExecutionResult that is
> >> further returned by execute(). So I cannot get a JobExecutionResult
> >> right now.
> >>
> >> It would be nice to get the JobExecutionResult when stopping the running
> >> program via a "stop-execution"-call (is there any way to do this?).
> >> Right now, I sleep for a certain time after calling
> >> submitJobDetached(...) an call stop() and shutdown() later on (from
> >> ForkableMiniCluster). The stop() call does not seem to do anything...
> >> shutdown() works (except for the Exception I get -- as described above).
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 03/30/2015 09:08 PM, Robert Metzger wrote:
> >>> Hi Matthias,
> >>>
> >>> the streaming folks can probably answer the questions better. But I'll
> >>> write something to bring this message back to their attention ;)
> >>>
> >>> 1) Which exceptions are you seeing? Flink should be able to cleanly
> shut
> >>> down.
> >>> 2) As far as I saw it, the execute() method (of the Streaming API) got
> an
> >>> JobExecutionResult return type in the latest master. That contains
> >>> accumulator results.
> >>> 3) I think the cancel() method is there for exactly that purpose. If
> the
> >>> job is shutting down before the cancel method, that probably a bug.
> >>>
> >>>
> >>> Robert
> >>>
> >>>
> >>>
> >>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> >>> mjsax@informatik.hu-berlin.de> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I am trying to run an infinite streaming job (ie, one that does not
> >>>> terminate because it is generating output date randomly on the fly). I
> >>>> kill this job with .stop() or .shutdown() method of
> >>>> ForkableFlinkMiniCluster.
> >>>>
> >>>> I did not find any example using a similar setup. In the provided
> >>>> examples, each job terminate automatically, because only a finite
> input
> >>>> is processed and the source returns after all data is emitted.
> >>>>
> >>>>
> >>>> I have multiple question about my setup:
> >>>>
> >>>>  1) The job never terminates "clean", ie, I get some exceptions. Is
> this
> >>>> behavior desired?
> >>>>
> >>>>  2) Is it possible to get a result back? Similar to
> >>>> JobClient.submitJobAndWait(...)?
> >>>>
> >>>>  3) Is it somehow possible, to send a signal to the running job such
> >>>> that the source can terminate regularly as if finite input would be
> >>>> processed? Right now, I use an while(running) loop and set 'running'
> to
> >>>> false in the .cancel() method.
> >>>>
> >>>>
> >>>>
> >>>> Thanks for your help!
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Hi,

I will pull the fix and try it out.

Thanks for the hint with the extra Thread. That should work for me. But
you are actually right; my setup is Storm inspired. I thinks its a very
natural way to deploy and stop and infinite streaming job. Maybe, you
want to adopt to it.

The ITCase I am writing bases on StreamingProgramTestBase, so I need the
JobExecutionResult because the test fails without it.


-Matthias



On 04/01/2015 11:09 AM, Márton Balassi wrote:
> Hey Matthias,
> 
> Thanks for reporting the Exception thrown, we were not preparing for this
> use case yet. We fixed it with Gyula, he is pushing a fix for it right now:
> When the job is cancelled (for example due to shutting down the executor
> underneath) you should not see that InterruptedException as soon as this
> commit is in. [1]
> 
> As for getting the streaming JobExecutionResult back from a detached job my
> current best practice is what you can see in
> the ProcessFailureRecoveryTestBase and its streaming implementation:
> starting an executor in a separate thread and then joining it with the main
> one. Would you prefer a more Storm example-ish solution? [2]
> 
> [1] https://github.com/mbalassi/flink/commit/5db06d6d
> [2]
> https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104
> 
> On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
> 
>> Hi Robert,
>>
>> thanks for your answer.
>>
>> I get an InterruptedException when I call shutdown():
>>
>> java.lang.InterruptedException
>>         at java.lang.Object.wait(Native Method)
>>         at java.lang.Thread.join(Thread.java:1225)
>>         at java.lang.Thread.join(Thread.java:1278)
>>         at
>>
>> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>>         at
>>
>> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>>         at
>>
>> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>>         at
>>
>> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>>         at
>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>         at java.lang.Thread.run(Thread.java:701)
>>
>>
>> About the JobExecutionResult:
>>
>> I added a new method to the API, that calls
>> JobClient.submitJobDetached(...) instead of
>> JobClient.submitJobAndWait(...). The "detached" version has no return
>> value, while the blocking one returns a JobExecutionResult that is
>> further returned by execute(). So I cannot get a JobExecutionResult
>> right now.
>>
>> It would be nice to get the JobExecutionResult when stopping the running
>> program via a "stop-execution"-call (is there any way to do this?).
>> Right now, I sleep for a certain time after calling
>> submitJobDetached(...) an call stop() and shutdown() later on (from
>> ForkableMiniCluster). The stop() call does not seem to do anything...
>> shutdown() works (except for the Exception I get -- as described above).
>>
>>
>> -Matthias
>>
>>
>> On 03/30/2015 09:08 PM, Robert Metzger wrote:
>>> Hi Matthias,
>>>
>>> the streaming folks can probably answer the questions better. But I'll
>>> write something to bring this message back to their attention ;)
>>>
>>> 1) Which exceptions are you seeing? Flink should be able to cleanly shut
>>> down.
>>> 2) As far as I saw it, the execute() method (of the Streaming API) got an
>>> JobExecutionResult return type in the latest master. That contains
>>> accumulator results.
>>> 3) I think the cancel() method is there for exactly that purpose. If the
>>> job is shutting down before the cancel method, that probably a bug.
>>>
>>>
>>> Robert
>>>
>>>
>>>
>>> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
>>> mjsax@informatik.hu-berlin.de> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run an infinite streaming job (ie, one that does not
>>>> terminate because it is generating output date randomly on the fly). I
>>>> kill this job with .stop() or .shutdown() method of
>>>> ForkableFlinkMiniCluster.
>>>>
>>>> I did not find any example using a similar setup. In the provided
>>>> examples, each job terminate automatically, because only a finite input
>>>> is processed and the source returns after all data is emitted.
>>>>
>>>>
>>>> I have multiple question about my setup:
>>>>
>>>>  1) The job never terminates "clean", ie, I get some exceptions. Is this
>>>> behavior desired?
>>>>
>>>>  2) Is it possible to get a result back? Similar to
>>>> JobClient.submitJobAndWait(...)?
>>>>
>>>>  3) Is it somehow possible, to send a signal to the running job such
>>>> that the source can terminate regularly as if finite input would be
>>>> processed? Right now, I use an while(running) loop and set 'running' to
>>>> false in the .cancel() method.
>>>>
>>>>
>>>>
>>>> Thanks for your help!
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Posted by Márton Balassi <ba...@gmail.com>.
Hey Matthias,

Thanks for reporting the Exception thrown, we were not preparing for this
use case yet. We fixed it with Gyula, he is pushing a fix for it right now:
When the job is cancelled (for example due to shutting down the executor
underneath) you should not see that InterruptedException as soon as this
commit is in. [1]

As for getting the streaming JobExecutionResult back from a detached job my
current best practice is what you can see in
the ProcessFailureRecoveryTestBase and its streaming implementation:
starting an executor in a separate thread and then joining it with the main
one. Would you prefer a more Storm example-ish solution? [2]

[1] https://github.com/mbalassi/flink/commit/5db06d6d
[2]
https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104

On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Hi Robert,
>
> thanks for your answer.
>
> I get an InterruptedException when I call shutdown():
>
> java.lang.InterruptedException
>         at java.lang.Object.wait(Native Method)
>         at java.lang.Thread.join(Thread.java:1225)
>         at java.lang.Thread.join(Thread.java:1278)
>         at
>
> org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
>         at
>
> org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
>         at
>
> org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
>         at
>
> org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>         at java.lang.Thread.run(Thread.java:701)
>
>
> About the JobExecutionResult:
>
> I added a new method to the API, that calls
> JobClient.submitJobDetached(...) instead of
> JobClient.submitJobAndWait(...). The "detached" version has no return
> value, while the blocking one returns a JobExecutionResult that is
> further returned by execute(). So I cannot get a JobExecutionResult
> right now.
>
> It would be nice to get the JobExecutionResult when stopping the running
> program via a "stop-execution"-call (is there any way to do this?).
> Right now, I sleep for a certain time after calling
> submitJobDetached(...) an call stop() and shutdown() later on (from
> ForkableMiniCluster). The stop() call does not seem to do anything...
> shutdown() works (except for the Exception I get -- as described above).
>
>
> -Matthias
>
>
> On 03/30/2015 09:08 PM, Robert Metzger wrote:
> > Hi Matthias,
> >
> > the streaming folks can probably answer the questions better. But I'll
> > write something to bring this message back to their attention ;)
> >
> > 1) Which exceptions are you seeing? Flink should be able to cleanly shut
> > down.
> > 2) As far as I saw it, the execute() method (of the Streaming API) got an
> > JobExecutionResult return type in the latest master. That contains
> > accumulator results.
> > 3) I think the cancel() method is there for exactly that purpose. If the
> > job is shutting down before the cancel method, that probably a bug.
> >
> >
> > Robert
> >
> >
> >
> > On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> > mjsax@informatik.hu-berlin.de> wrote:
> >
> >> Hi,
> >>
> >> I am trying to run an infinite streaming job (ie, one that does not
> >> terminate because it is generating output date randomly on the fly). I
> >> kill this job with .stop() or .shutdown() method of
> >> ForkableFlinkMiniCluster.
> >>
> >> I did not find any example using a similar setup. In the provided
> >> examples, each job terminate automatically, because only a finite input
> >> is processed and the source returns after all data is emitted.
> >>
> >>
> >> I have multiple question about my setup:
> >>
> >>  1) The job never terminates "clean", ie, I get some exceptions. Is this
> >> behavior desired?
> >>
> >>  2) Is it possible to get a result back? Similar to
> >> JobClient.submitJobAndWait(...)?
> >>
> >>  3) Is it somehow possible, to send a signal to the running job such
> >> that the source can terminate regularly as if finite input would be
> >> processed? Right now, I use an while(running) loop and set 'running' to
> >> false in the .cancel() method.
> >>
> >>
> >>
> >> Thanks for your help!
> >>
> >> -Matthias
> >>
> >>
> >>
> >
>
>

Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Hi Robert,

thanks for your answer.

I get an InterruptedException when I call shutdown():

java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1225)
	at java.lang.Thread.join(Thread.java:1278)
	at
org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55)
	at
org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77)
	at
org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204)
	at
org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195)
	at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
	at java.lang.Thread.run(Thread.java:701)


About the JobExecutionResult:

I added a new method to the API, that calls
JobClient.submitJobDetached(...) instead of
JobClient.submitJobAndWait(...). The "detached" version has no return
value, while the blocking one returns a JobExecutionResult that is
further returned by execute(). So I cannot get a JobExecutionResult
right now.

It would be nice to get the JobExecutionResult when stopping the running
program via a "stop-execution"-call (is there any way to do this?).
Right now, I sleep for a certain time after calling
submitJobDetached(...) an call stop() and shutdown() later on (from
ForkableMiniCluster). The stop() call does not seem to do anything...
shutdown() works (except for the Exception I get -- as described above).


-Matthias


On 03/30/2015 09:08 PM, Robert Metzger wrote:
> Hi Matthias,
> 
> the streaming folks can probably answer the questions better. But I'll
> write something to bring this message back to their attention ;)
> 
> 1) Which exceptions are you seeing? Flink should be able to cleanly shut
> down.
> 2) As far as I saw it, the execute() method (of the Streaming API) got an
> JobExecutionResult return type in the latest master. That contains
> accumulator results.
> 3) I think the cancel() method is there for exactly that purpose. If the
> job is shutting down before the cancel method, that probably a bug.
> 
> 
> Robert
> 
> 
> 
> On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
> 
>> Hi,
>>
>> I am trying to run an infinite streaming job (ie, one that does not
>> terminate because it is generating output date randomly on the fly). I
>> kill this job with .stop() or .shutdown() method of
>> ForkableFlinkMiniCluster.
>>
>> I did not find any example using a similar setup. In the provided
>> examples, each job terminate automatically, because only a finite input
>> is processed and the source returns after all data is emitted.
>>
>>
>> I have multiple question about my setup:
>>
>>  1) The job never terminates "clean", ie, I get some exceptions. Is this
>> behavior desired?
>>
>>  2) Is it possible to get a result back? Similar to
>> JobClient.submitJobAndWait(...)?
>>
>>  3) Is it somehow possible, to send a signal to the running job such
>> that the source can terminate regularly as if finite input would be
>> processed? Right now, I use an while(running) loop and set 'running' to
>> false in the .cancel() method.
>>
>>
>>
>> Thanks for your help!
>>
>> -Matthias
>>
>>
>>
> 


Re: Question about Infinite Streaming Job on Mini Cluster and ITCase

Posted by Robert Metzger <rm...@apache.org>.
Hi Matthias,

the streaming folks can probably answer the questions better. But I'll
write something to bring this message back to their attention ;)

1) Which exceptions are you seeing? Flink should be able to cleanly shut
down.
2) As far as I saw it, the execute() method (of the Streaming API) got an
JobExecutionResult return type in the latest master. That contains
accumulator results.
3) I think the cancel() method is there for exactly that purpose. If the
job is shutting down before the cancel method, that probably a bug.


Robert



On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Hi,
>
> I am trying to run an infinite streaming job (ie, one that does not
> terminate because it is generating output date randomly on the fly). I
> kill this job with .stop() or .shutdown() method of
> ForkableFlinkMiniCluster.
>
> I did not find any example using a similar setup. In the provided
> examples, each job terminate automatically, because only a finite input
> is processed and the source returns after all data is emitted.
>
>
> I have multiple question about my setup:
>
>  1) The job never terminates "clean", ie, I get some exceptions. Is this
> behavior desired?
>
>  2) Is it possible to get a result back? Similar to
> JobClient.submitJobAndWait(...)?
>
>  3) Is it somehow possible, to send a signal to the running job such
> that the source can terminate regularly as if finite input would be
> processed? Right now, I use an while(running) loop and set 'running' to
> false in the .cancel() method.
>
>
>
> Thanks for your help!
>
> -Matthias
>
>
>