You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Tim Robertson <ti...@gmail.com> on 2018/04/01 09:35:45 UTC

DirectRunner in test - await completion of workers threads?

Hi devs

I'm working on SolrIO tests for failure scenarios (i.e. an exception will
come out of the pipeline execution).  I see that the exception is surfaced
to the driver while "direct-runner-worker" threads are still running.  This
causes issue because:

  1. The Solr tests do thread leak detection, and a solrClient.close() is
what removes the object
  2. @Teardown is not necessarily called which is what would close the
solrClient

I can unregister all the solrClients that have been spawned.  However I
have seen race conditions where there are still threads running creating
and registering clients. I need to someone ensure that all workers related
to the pipeline execution are indeed finished so no new ones are created
after the first exception is passed up.

Currently I have this (psuedo code) which works, but I suspect someone can
suggest a better approach:

// store the state of clients registered for object leak check
Set<Object> existingClients = registeredSolrClients();
try {
  pipeline.run();

} catch (Pipeline.PipelineExecutionException e) {


// Hack: await all bundle workers completing
while (namedThreadStillExists("direct-runner-worker")) {
Thread.sleep(100);
}

// remove all solrClients created in this execution only
// since the teardown may not have done so
for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
if (o instanceof SolrClient && !existingClients.contains(o)) {
ObjectReleaseTracker.release(o);
}
}

// now we can do our assertions
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
1));


Please do point out the obvious if I am missing it - I am a newbie here...

Thank you all very much,
Tim
(timrobertson100@gmail.com on the slack apache/beam channel)

Re: DirectRunner in test - await completion of workers threads?

Posted by Ismaël Mejía <ie...@gmail.com>.
It seems there is still an issue with teardown not being called in
failed tasks, just created BEAM-4040 to track it.

On Thu, Apr 5, 2018 at 4:45 PM, Tim Robertson <ti...@gmail.com> wrote:
> Will do - I'll report the result on https://github.com/apache/beam/pull/4905
>
> On Thu, Apr 5, 2018 at 11:45 AM, Ismaël Mejía <ie...@gmail.com> wrote:
>>
>> For info, Romain's PR was merged today, can you confirm if this fixes
>> the issue Tim.
>>
>> On Sun, Apr 1, 2018 at 9:21 PM, Tim Robertson <ti...@gmail.com>
>> wrote:
>> > Thanks all.
>> >
>> > I went with what I outlined above, which you can see in this test.
>> >
>> > https://github.com/timrobertson100/beam/blob/BEAM-3848/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java#L285
>> >
>> > That forms part of this PR https://github.com/apache/beam/pull/4956
>> >
>> > I'll monitor Romain's PR and back it out when appropriate.
>> >
>> >
>> >
>> >
>> >
>> > On Sun, Apr 1, 2018 at 8:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> > wrote:
>> >>
>> >> Indeed. It's exactly what Romain's PR is about.
>> >>
>> >> Regards
>> >> JB
>> >> Le 1 avr. 2018, à 19:33, Reuven Lax <re...@google.com> a écrit:
>> >>>
>> >>> Correct - teardown is currently run in the direct runner, but
>> >>> asynchronously. I believe Romain's pending PRs should solve this for
>> >>> your
>> >>> use case.
>> >>>
>> >>> On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson <
>> >>> timrobertson100@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Thanks for confirming Romain - also for the very fast reply!
>> >>>>
>> >>>> I'll continue with the workaround and reference BEAM-3409 inline as
>> >>>> justification.
>> >>>> I'm trying to wrap this up before travel next week, but if I get a
>> >>>> chance I'll try and run this scenario (BEAM-3848) with your patch.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau
>> >>>> <rm...@gmail.com> wrote:
>> >>>>>
>> >>>>> Hi
>> >>>>>
>> >>>>> I have the same blocker and created
>> >>>>>
>> >>>>> https://github.com/apache/beam/pull/4790 and
>> >>>>> https://github.com/apache/beam/pull/4965 to solve part of it
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Le 1 avr. 2018 11:35, "Tim Robertson" < timrobertson100@gmail.com> a
>> >>>>> écrit :
>> >>>>>
>> >>>>> Hi devs
>> >>>>>
>> >>>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception
>> >>>>> will come out of the pipeline execution).  I see that the exception
>> >>>>> is
>> >>>>> surfaced to the driver while " direct-runner-worker" threads are
>> >>>>> still
>> >>>>> running.  This causes issue because:
>> >>>>>
>> >>>>>   1. The Solr tests do thread leak detection, and a
>> >>>>> solrClient.close()
>> >>>>> is what removes the object
>> >>>>>   2. @Teardown is not necessarily called which is what would close
>> >>>>> the
>> >>>>> solrClient
>> >>>>>
>> >>>>> I can unregister all the solrClients that have been spawned.
>> >>>>> However I
>> >>>>> have seen race conditions where there are still threads running
>> >>>>> creating and
>> >>>>> registering clients. I need to someone ensure that all workers
>> >>>>> related to
>> >>>>> the pipeline execution are indeed finished so no new ones are
>> >>>>> created after
>> >>>>> the first exception is passed up.
>> >>>>>
>> >>>>> Currently I have this (psuedo code) which works, but I suspect
>> >>>>> someone
>> >>>>> can suggest a better approach:
>> >>>>>
>> >>>>> // store the state of clients registered for object leak check
>> >>>>> Set<Object> existingClients = registeredSolrClients();
>> >>>>> try {
>> >>>>>   pipeline.run();
>> >>>>>
>> >>>>> } catch (Pipeline.PipelineExecutionException e) {
>> >>>>>
>> >>>>>
>> >>>>>   // Hack: await all bundle workers completing
>> >>>>>   while (namedThreadStillExists("direct-runner-worker")) {
>> >>>>>     Thread.sleep(100);
>> >>>>>   }
>> >>>>>
>> >>>>>   // remove all solrClients created in this execution only
>> >>>>>   // since the teardown may not have done so
>> >>>>>   for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>> >>>>>     if (o instanceof SolrClient && !existingClients.contains(o)) {
>> >>>>>       ObjectReleaseTracker.release(o);
>> >>>>>     }
>> >>>>>   }
>> >>>>>
>> >>>>>   // now we can do our assertions
>> >>>>>
>> >>>>>
>> >>>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
>> >>>>> 1));
>> >>>>>
>> >>>>>
>> >>>>> Please do point out the obvious if I am missing it - I am a newbie
>> >>>>> here...
>> >>>>>
>> >>>>> Thank you all very much,
>> >>>>> Tim
>> >>>>> ( timrobertson100@gmail.com on the slack apache/beam channel)
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >
>
>

Re: DirectRunner in test - await completion of workers threads?

Posted by Tim Robertson <ti...@gmail.com>.
Will do - I'll report the result on https://github.com/apache/beam/pull/4905


On Thu, Apr 5, 2018 at 11:45 AM, Ismaël Mejía <ie...@gmail.com> wrote:

> For info, Romain's PR was merged today, can you confirm if this fixes
> the issue Tim.
>
> On Sun, Apr 1, 2018 at 9:21 PM, Tim Robertson <ti...@gmail.com>
> wrote:
> > Thanks all.
> >
> > I went with what I outlined above, which you can see in this test.
> > https://github.com/timrobertson100/beam/blob/
> BEAM-3848/sdks/java/io/solr/src/test/java/org/apache/beam/
> sdk/io/solr/SolrIOTest.java#L285
> >
> > That forms part of this PR https://github.com/apache/beam/pull/4956
> >
> > I'll monitor Romain's PR and back it out when appropriate.
> >
> >
> >
> >
> >
> > On Sun, Apr 1, 2018 at 8:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> > wrote:
> >>
> >> Indeed. It's exactly what Romain's PR is about.
> >>
> >> Regards
> >> JB
> >> Le 1 avr. 2018, à 19:33, Reuven Lax <re...@google.com> a écrit:
> >>>
> >>> Correct - teardown is currently run in the direct runner, but
> >>> asynchronously. I believe Romain's pending PRs should solve this for
> your
> >>> use case.
> >>>
> >>> On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson <
> timrobertson100@gmail.com>
> >>> wrote:
> >>>>
> >>>> Thanks for confirming Romain - also for the very fast reply!
> >>>>
> >>>> I'll continue with the workaround and reference BEAM-3409 inline as
> >>>> justification.
> >>>> I'm trying to wrap this up before travel next week, but if I get a
> >>>> chance I'll try and run this scenario (BEAM-3848) with your patch.
> >>>>
> >>>>
> >>>>
> >>>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau
> >>>> <rm...@gmail.com> wrote:
> >>>>>
> >>>>> Hi
> >>>>>
> >>>>> I have the same blocker and created
> >>>>>
> >>>>> https://github.com/apache/beam/pull/4790 and
> >>>>> https://github.com/apache/beam/pull/4965 to solve part of it
> >>>>>
> >>>>>
> >>>>>
> >>>>> Le 1 avr. 2018 11:35, "Tim Robertson" < timrobertson100@gmail.com> a
> >>>>> écrit :
> >>>>>
> >>>>> Hi devs
> >>>>>
> >>>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception
> >>>>> will come out of the pipeline execution).  I see that the exception
> is
> >>>>> surfaced to the driver while " direct-runner-worker" threads are
> still
> >>>>> running.  This causes issue because:
> >>>>>
> >>>>>   1. The Solr tests do thread leak detection, and a
> solrClient.close()
> >>>>> is what removes the object
> >>>>>   2. @Teardown is not necessarily called which is what would close
> the
> >>>>> solrClient
> >>>>>
> >>>>> I can unregister all the solrClients that have been spawned.
> However I
> >>>>> have seen race conditions where there are still threads running
> creating and
> >>>>> registering clients. I need to someone ensure that all workers
> related to
> >>>>> the pipeline execution are indeed finished so no new ones are
> created after
> >>>>> the first exception is passed up.
> >>>>>
> >>>>> Currently I have this (psuedo code) which works, but I suspect
> someone
> >>>>> can suggest a better approach:
> >>>>>
> >>>>> // store the state of clients registered for object leak check
> >>>>> Set<Object> existingClients = registeredSolrClients();
> >>>>> try {
> >>>>>   pipeline.run();
> >>>>>
> >>>>> } catch (Pipeline.PipelineExecutionException e) {
> >>>>>
> >>>>>
> >>>>>   // Hack: await all bundle workers completing
> >>>>>   while (namedThreadStillExists("direct-runner-worker")) {
> >>>>>     Thread.sleep(100);
> >>>>>   }
> >>>>>
> >>>>>   // remove all solrClients created in this execution only
> >>>>>   // since the teardown may not have done so
> >>>>>   for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
> >>>>>     if (o instanceof SolrClient && !existingClients.contains(o)) {
> >>>>>       ObjectReleaseTracker.release(o);
> >>>>>     }
> >>>>>   }
> >>>>>
> >>>>>   // now we can do our assertions
> >>>>>
> >>>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.
> WriteFn.RETRY_ATTEMPT_LOG,
> >>>>> 1));
> >>>>>
> >>>>>
> >>>>> Please do point out the obvious if I am missing it - I am a newbie
> >>>>> here...
> >>>>>
> >>>>> Thank you all very much,
> >>>>> Tim
> >>>>> ( timrobertson100@gmail.com on the slack apache/beam channel)
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >
>

Re: DirectRunner in test - await completion of workers threads?

Posted by Ismaël Mejía <ie...@gmail.com>.
For info, Romain's PR was merged today, can you confirm if this fixes
the issue Tim.

On Sun, Apr 1, 2018 at 9:21 PM, Tim Robertson <ti...@gmail.com> wrote:
> Thanks all.
>
> I went with what I outlined above, which you can see in this test.
> https://github.com/timrobertson100/beam/blob/BEAM-3848/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java#L285
>
> That forms part of this PR https://github.com/apache/beam/pull/4956
>
> I'll monitor Romain's PR and back it out when appropriate.
>
>
>
>
>
> On Sun, Apr 1, 2018 at 8:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>>
>> Indeed. It's exactly what Romain's PR is about.
>>
>> Regards
>> JB
>> Le 1 avr. 2018, à 19:33, Reuven Lax <re...@google.com> a écrit:
>>>
>>> Correct - teardown is currently run in the direct runner, but
>>> asynchronously. I believe Romain's pending PRs should solve this for your
>>> use case.
>>>
>>> On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson < timrobertson100@gmail.com>
>>> wrote:
>>>>
>>>> Thanks for confirming Romain - also for the very fast reply!
>>>>
>>>> I'll continue with the workaround and reference BEAM-3409 inline as
>>>> justification.
>>>> I'm trying to wrap this up before travel next week, but if I get a
>>>> chance I'll try and run this scenario (BEAM-3848) with your patch.
>>>>
>>>>
>>>>
>>>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau
>>>> <rm...@gmail.com> wrote:
>>>>>
>>>>> Hi
>>>>>
>>>>> I have the same blocker and created
>>>>>
>>>>> https://github.com/apache/beam/pull/4790 and
>>>>> https://github.com/apache/beam/pull/4965 to solve part of it
>>>>>
>>>>>
>>>>>
>>>>> Le 1 avr. 2018 11:35, "Tim Robertson" < timrobertson100@gmail.com> a
>>>>> écrit :
>>>>>
>>>>> Hi devs
>>>>>
>>>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception
>>>>> will come out of the pipeline execution).  I see that the exception is
>>>>> surfaced to the driver while " direct-runner-worker" threads are still
>>>>> running.  This causes issue because:
>>>>>
>>>>>   1. The Solr tests do thread leak detection, and a solrClient.close()
>>>>> is what removes the object
>>>>>   2. @Teardown is not necessarily called which is what would close the
>>>>> solrClient
>>>>>
>>>>> I can unregister all the solrClients that have been spawned.  However I
>>>>> have seen race conditions where there are still threads running creating and
>>>>> registering clients. I need to someone ensure that all workers related to
>>>>> the pipeline execution are indeed finished so no new ones are created after
>>>>> the first exception is passed up.
>>>>>
>>>>> Currently I have this (psuedo code) which works, but I suspect someone
>>>>> can suggest a better approach:
>>>>>
>>>>> // store the state of clients registered for object leak check
>>>>> Set<Object> existingClients = registeredSolrClients();
>>>>> try {
>>>>>   pipeline.run();
>>>>>
>>>>> } catch (Pipeline.PipelineExecutionException e) {
>>>>>
>>>>>
>>>>>   // Hack: await all bundle workers completing
>>>>>   while (namedThreadStillExists("direct-runner-worker")) {
>>>>>     Thread.sleep(100);
>>>>>   }
>>>>>
>>>>>   // remove all solrClients created in this execution only
>>>>>   // since the teardown may not have done so
>>>>>   for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>>>>>     if (o instanceof SolrClient && !existingClients.contains(o)) {
>>>>>       ObjectReleaseTracker.release(o);
>>>>>     }
>>>>>   }
>>>>>
>>>>>   // now we can do our assertions
>>>>>
>>>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
>>>>> 1));
>>>>>
>>>>>
>>>>> Please do point out the obvious if I am missing it - I am a newbie
>>>>> here...
>>>>>
>>>>> Thank you all very much,
>>>>> Tim
>>>>> ( timrobertson100@gmail.com on the slack apache/beam channel)
>>>>>
>>>>>
>>>>>
>>>>
>

Re: DirectRunner in test - await completion of workers threads?

Posted by Tim Robertson <ti...@gmail.com>.
Thanks all.

I went with what I outlined above, which you can see in this test.
https://github.com/timrobertson100/beam/blob/BEAM-3848/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java#L285

That forms part of this PR https://github.com/apache/beam/pull/4956

I'll monitor Romain's PR and back it out when appropriate.





On Sun, Apr 1, 2018 at 8:20 PM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Indeed. It's exactly what Romain's PR is about.
>
> Regards
> JB
> Le 1 avr. 2018, à 19:33, Reuven Lax <re...@google.com> a écrit:
>
>> Correct - teardown is currently run in the direct runner, but
>> asynchronously. I believe Romain's pending PRs should solve this for your
>> use case.
>>
>> On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson < timrobertson100@gmail.com>
>> wrote:
>>
>>> Thanks for confirming Romain - also for the very fast reply!
>>>
>>> I'll continue with the workaround and reference BEAM-3409 inline as
>>> justification.
>>> I'm trying to wrap this up before travel next week, but if I get a
>>> chance I'll try and run this scenario (BEAM-3848) with your patch.
>>>
>>>
>>>
>>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I have the same blocker and created
>>>>
>>>> https://github.com/apache/beam/pull/4790 and
>>>> https://github.com/apache/beam/pull/4965 to solve part of it
>>>>
>>>>
>>>>
>>>> Le 1 avr. 2018 11:35, "Tim Robertson" < timrobertson100@gmail.com> a
>>>> écrit :
>>>>
>>>> Hi devs
>>>>
>>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception
>>>> will come out of the pipeline execution).  I see that the exception is
>>>> surfaced to the driver while " direct-runner-worker" threads are still
>>>> running.  This causes issue because:
>>>>
>>>>   1. The Solr tests do thread leak detection, and a solrClient.close()
>>>> is what removes the object
>>>>   2. @Teardown is not necessarily called which is what would close the
>>>> solrClient
>>>>
>>>> I can unregister all the solrClients that have been spawned.  However I
>>>> have seen race conditions where there are still threads running creating
>>>> and registering clients. I need to someone ensure that all workers related
>>>> to the pipeline execution are indeed finished so no new ones are created
>>>> after the first exception is passed up.
>>>>
>>>> Currently I have this (psuedo code) which works, but I suspect someone
>>>> can suggest a better approach:
>>>>
>>>> // store the state of clients registered for object leak check
>>>> Set<Object> existingClients = registeredSolrClients();
>>>> try {
>>>>   pipeline.run();
>>>>
>>>> } catch (Pipeline.PipelineExecutionException e) {
>>>>
>>>>
>>>> // Hack: await all bundle workers completing
>>>> while (namedThreadStillExists("direct-runner-worker")) {
>>>> Thread.sleep(100);
>>>> }
>>>>
>>>> // remove all solrClients created in this execution only
>>>> // since the teardown may not have done so
>>>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>>>> if (o instanceof SolrClient && !existingClients.contains(o)) {
>>>> ObjectReleaseTracker.release(o);
>>>> }
>>>> }
>>>>
>>>> // now we can do our assertions
>>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.
>>>> RETRY_ATTEMPT_LOG, 1));
>>>>
>>>>
>>>> Please do point out the obvious if I am missing it - I am a newbie
>>>> here...
>>>>
>>>> Thank you all very much,
>>>> Tim
>>>> ( timrobertson100@gmail.com on the slack apache/beam channel)
>>>>
>>>>
>>>>
>>>>
>>>

Re: DirectRunner in test - await completion of workers threads?

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Indeed. It's exactly what Romain's PR is about.

Regards
JB

Le 1 avr. 2018 à 19:33, à 19:33, Reuven Lax <re...@google.com> a écrit:
>Correct - teardown is currently run in the direct runner, but
>asynchronously. I believe Romain's pending PRs should solve this for
>your
>use case.
>
>On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson
><ti...@gmail.com>
>wrote:
>
>> Thanks for confirming Romain - also for the very fast reply!
>>
>> I'll continue with the workaround and reference BEAM-3409 inline as
>> justification.
>> I'm trying to wrap this up before travel next week, but if I get a
>chance
>> I'll try and run this scenario (BEAM-3848) with your patch.
>>
>>
>>
>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau
><rmannibucau@gmail.com
>> > wrote:
>>
>>> Hi
>>>
>>> I have the same blocker and created
>>>
>>> https://github.com/apache/beam/pull/4790 and
>>> https://github.com/apache/beam/pull/4965 to solve part of it
>>>
>>>
>>>
>>> Le 1 avr. 2018 11:35, "Tim Robertson" <ti...@gmail.com> a
>>> écrit :
>>>
>>> Hi devs
>>>
>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception
>will
>>> come out of the pipeline execution).  I see that the exception is
>surfaced
>>> to the driver while "direct-runner-worker" threads are still
>running.
>>> This causes issue because:
>>>
>>>   1. The Solr tests do thread leak detection, and a
>solrClient.close()
>>> is what removes the object
>>>   2. @Teardown is not necessarily called which is what would close
>the
>>> solrClient
>>>
>>> I can unregister all the solrClients that have been spawned. 
>However I
>>> have seen race conditions where there are still threads running
>creating
>>> and registering clients. I need to someone ensure that all workers
>related
>>> to the pipeline execution are indeed finished so no new ones are
>created
>>> after the first exception is passed up.
>>>
>>> Currently I have this (psuedo code) which works, but I suspect
>someone
>>> can suggest a better approach:
>>>
>>> // store the state of clients registered for object leak check
>>> Set<Object> existingClients = registeredSolrClients();
>>> try {
>>>   pipeline.run();
>>>
>>> } catch (Pipeline.PipelineExecutionException e) {
>>>
>>>
>>> // Hack: await all bundle workers completing
>>> while (namedThreadStillExists("direct-runner-worker")) {
>>> Thread.sleep(100);
>>> }
>>>
>>> // remove all solrClients created in this execution only
>>> // since the teardown may not have done so
>>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>>> if (o instanceof SolrClient && !existingClients.contains(o)) {
>>> ObjectReleaseTracker.release(o);
>>> }
>>> }
>>>
>>> // now we can do our assertions
>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.
>>> RETRY_ATTEMPT_LOG, 1));
>>>
>>>
>>> Please do point out the obvious if I am missing it - I am a newbie
>here...
>>>
>>> Thank you all very much,
>>> Tim
>>> (timrobertson100@gmail.com on the slack apache/beam channel)
>>>
>>>
>>>
>>>
>>

Re: DirectRunner in test - await completion of workers threads?

Posted by Reuven Lax <re...@google.com>.
Correct - teardown is currently run in the direct runner, but
asynchronously. I believe Romain's pending PRs should solve this for your
use case.

On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson <ti...@gmail.com>
wrote:

> Thanks for confirming Romain - also for the very fast reply!
>
> I'll continue with the workaround and reference BEAM-3409 inline as
> justification.
> I'm trying to wrap this up before travel next week, but if I get a chance
> I'll try and run this scenario (BEAM-3848) with your patch.
>
>
>
> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <rmannibucau@gmail.com
> > wrote:
>
>> Hi
>>
>> I have the same blocker and created
>>
>> https://github.com/apache/beam/pull/4790 and
>> https://github.com/apache/beam/pull/4965 to solve part of it
>>
>>
>>
>> Le 1 avr. 2018 11:35, "Tim Robertson" <ti...@gmail.com> a
>> écrit :
>>
>> Hi devs
>>
>> I'm working on SolrIO tests for failure scenarios (i.e. an exception will
>> come out of the pipeline execution).  I see that the exception is surfaced
>> to the driver while "direct-runner-worker" threads are still running.
>> This causes issue because:
>>
>>   1. The Solr tests do thread leak detection, and a solrClient.close()
>> is what removes the object
>>   2. @Teardown is not necessarily called which is what would close the
>> solrClient
>>
>> I can unregister all the solrClients that have been spawned.  However I
>> have seen race conditions where there are still threads running creating
>> and registering clients. I need to someone ensure that all workers related
>> to the pipeline execution are indeed finished so no new ones are created
>> after the first exception is passed up.
>>
>> Currently I have this (psuedo code) which works, but I suspect someone
>> can suggest a better approach:
>>
>> // store the state of clients registered for object leak check
>> Set<Object> existingClients = registeredSolrClients();
>> try {
>>   pipeline.run();
>>
>> } catch (Pipeline.PipelineExecutionException e) {
>>
>>
>> // Hack: await all bundle workers completing
>> while (namedThreadStillExists("direct-runner-worker")) {
>> Thread.sleep(100);
>> }
>>
>> // remove all solrClients created in this execution only
>> // since the teardown may not have done so
>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>> if (o instanceof SolrClient && !existingClients.contains(o)) {
>> ObjectReleaseTracker.release(o);
>> }
>> }
>>
>> // now we can do our assertions
>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.
>> RETRY_ATTEMPT_LOG, 1));
>>
>>
>> Please do point out the obvious if I am missing it - I am a newbie here...
>>
>> Thank you all very much,
>> Tim
>> (timrobertson100@gmail.com on the slack apache/beam channel)
>>
>>
>>
>>
>

Re: DirectRunner in test - await completion of workers threads?

Posted by Tim Robertson <ti...@gmail.com>.
Thanks for confirming Romain - also for the very fast reply!

I'll continue with the workaround and reference BEAM-3409 inline as
justification.
I'm trying to wrap this up before travel next week, but if I get a chance
I'll try and run this scenario (BEAM-3848) with your patch.



On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Hi
>
> I have the same blocker and created
>
> https://github.com/apache/beam/pull/4790 and https://github.com/apache/
> beam/pull/4965 to solve part of it
>
>
>
> Le 1 avr. 2018 11:35, "Tim Robertson" <ti...@gmail.com> a
> écrit :
>
> Hi devs
>
> I'm working on SolrIO tests for failure scenarios (i.e. an exception will
> come out of the pipeline execution).  I see that the exception is surfaced
> to the driver while "direct-runner-worker" threads are still running.
> This causes issue because:
>
>   1. The Solr tests do thread leak detection, and a solrClient.close() is
> what removes the object
>   2. @Teardown is not necessarily called which is what would close the
> solrClient
>
> I can unregister all the solrClients that have been spawned.  However I
> have seen race conditions where there are still threads running creating
> and registering clients. I need to someone ensure that all workers related
> to the pipeline execution are indeed finished so no new ones are created
> after the first exception is passed up.
>
> Currently I have this (psuedo code) which works, but I suspect someone can
> suggest a better approach:
>
> // store the state of clients registered for object leak check
> Set<Object> existingClients = registeredSolrClients();
> try {
>   pipeline.run();
>
> } catch (Pipeline.PipelineExecutionException e) {
>
>
> // Hack: await all bundle workers completing
> while (namedThreadStillExists("direct-runner-worker")) {
> Thread.sleep(100);
> }
>
> // remove all solrClients created in this execution only
> // since the teardown may not have done so
> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
> if (o instanceof SolrClient && !existingClients.contains(o)) {
> ObjectReleaseTracker.release(o);
> }
> }
>
> // now we can do our assertions
> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.R
> ETRY_ATTEMPT_LOG, 1));
>
>
> Please do point out the obvious if I am missing it - I am a newbie here...
>
> Thank you all very much,
> Tim
> (timrobertson100@gmail.com on the slack apache/beam channel)
>
>
>
>

Re: DirectRunner in test - await completion of workers threads?

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Hi

I have the same blocker and created

https://github.com/apache/beam/pull/4790 and
https://github.com/apache/beam/pull/4965 to solve part of it



Le 1 avr. 2018 11:35, "Tim Robertson" <ti...@gmail.com> a écrit :

Hi devs

I'm working on SolrIO tests for failure scenarios (i.e. an exception will
come out of the pipeline execution).  I see that the exception is surfaced
to the driver while "direct-runner-worker" threads are still running.  This
causes issue because:

  1. The Solr tests do thread leak detection, and a solrClient.close() is
what removes the object
  2. @Teardown is not necessarily called which is what would close the
solrClient

I can unregister all the solrClients that have been spawned.  However I
have seen race conditions where there are still threads running creating
and registering clients. I need to someone ensure that all workers related
to the pipeline execution are indeed finished so no new ones are created
after the first exception is passed up.

Currently I have this (psuedo code) which works, but I suspect someone can
suggest a better approach:

// store the state of clients registered for object leak check
Set<Object> existingClients = registeredSolrClients();
try {
  pipeline.run();

} catch (Pipeline.PipelineExecutionException e) {


// Hack: await all bundle workers completing
while (namedThreadStillExists("direct-runner-worker")) {
Thread.sleep(100);
}

// remove all solrClients created in this execution only
// since the teardown may not have done so
for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
if (o instanceof SolrClient && !existingClients.contains(o)) {
ObjectReleaseTracker.release(o);
}
}

// now we can do our assertions
expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG,
1));


Please do point out the obvious if I am missing it - I am a newbie here...

Thank you all very much,
Tim
(timrobertson100@gmail.com on the slack apache/beam channel)