You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/01 23:19:15 UTC

[GitHub] [beam] tudorm opened a new pull request #12994: Add support for gracefully aborting workers.

tudorm opened a new pull request #12994:
URL: https://github.com/apache/beam/pull/12994


   On complete_work_status received (by the work progress updater thread), abort the corresponding map task executor thread.  This relies on Thread.interrupt() to unblock the worker thread if blocked and raise an exception -- the exception is actually ignored by the backend since the backend already decided to abort this thread.  Must also check the current thread for interrupts in the valuesiterator when it consumes a stream of values behind the same key, else control does not return to the runReadLoop() to check the interruption.
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498937353



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       Internally within Flume we saw with the C++ worker where user code didn't handle thread::cancel correctly and were relying on process crash to not get stuck.
   
   I would generally love for us to be able to interrupt random code arbitrarily as it would be the best and cleanest way to do this and if we want to do this we should really have a way for users to opt-out incase it doesn't work for them. We can watch how many people opt-out and find out why and possibly make this the default forever or choose to make this the default in portable execution as that will require a migration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498998483



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       Discussed offline: "user" code can mean many things, and there are some common Java errors that this change could trigger:
   
   1. User's (who usually won't check the bit) doing catch-all error handling without separating out InterruptedException.
   2. IO libraries (which may often check the bit) doing incomplete or incorrect cleanup and being left in an inconsistent state or leaking resources.
   
   These would ideally both be noticed and handled by higher-level mechanisms since in both cases something should be notably unhealthy about the thread, process, or VM. For now just aborting without interrupting the thread is safer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498935779



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       Interrupting the GCS writer has caused issues with how the pipe that is used to transfer data is not gracefully shutdown leaving a blocked thread sitting around indefinitely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498944776



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       I may be misunderstanding, but this is a cooperative interrupt. The interrupt only happens when the user code return control.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tudorm commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
tudorm commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498598992



##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
##########
@@ -483,4 +483,29 @@ public void testExceptionInAbortSuppressed() throws Exception {
       assertThat(e.getSuppressed()[0].getMessage(), equalTo("suppressed in abort"));
     }
   }
+
+  @Test
+  public void testAbort() throws Exception {
+    // Operation must be an instance of ReadOperation or ReceivingOperation per preconditions
+    // in MapTaskExecutor.
+    Operation o = Mockito.mock(ReadOperation.class);
+
+    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    try (MapTaskExecutor executor =
+        new MapTaskExecutor(Arrays.<Operation>asList(o), counterSet, stateTracker)) {
+      Mockito.doAnswer(
+              invocation -> {
+                executor.abort();
+                return null;
+              })
+          .when(o)
+          .start();
+      executor.execute();
+      fail("Should have aborted");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InterruptedException);

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
##########
@@ -483,4 +483,29 @@ public void testExceptionInAbortSuppressed() throws Exception {
       assertThat(e.getSuppressed()[0].getMessage(), equalTo("suppressed in abort"));
     }
   }
+
+  @Test
+  public void testAbort() throws Exception {
+    // Operation must be an instance of ReadOperation or ReceivingOperation per preconditions
+    // in MapTaskExecutor.
+    Operation o = Mockito.mock(ReadOperation.class);
+
+    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    try (MapTaskExecutor executor =
+        new MapTaskExecutor(Arrays.<Operation>asList(o), counterSet, stateTracker)) {
+      Mockito.doAnswer(
+              invocation -> {
+                executor.abort();
+                return null;
+              })
+          .when(o)
+          .start();
+      executor.execute();
+      fail("Should have aborted");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InterruptedException);
+      Mockito.verify(o).abort();
+    }
+    Assert.assertTrue(Thread.currentThread().isInterrupted());

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498894067



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       There is no requirement for user code to handle being interrupted arbitrarily and there are enough instances that I have seen where this is not handled gracefully by the user.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tudorm commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
tudorm commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-702445980


   R: @kennknowles 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12994: [BEAM-11044] Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12994:
URL: https://github.com/apache/beam/pull/12994


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tudorm commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
tudorm commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-702493438


    retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tudorm commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
tudorm commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-704002909


   Per offline discussion, remove the delivery of thread interrupt to the worker thread, and instead added async abort signaling up to the GroupingShuffleReader's (values) iterator.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-702820329


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12994: [BEAM-11044] Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12994:
URL: https://github.com/apache/beam/pull/12994


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r501855713



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java
##########
@@ -402,6 +432,14 @@ public boolean hasNext() {
 
       @Override
       public V next() {
+        // Given that the underlying ReadOperation already checks the abort status after every
+        // record it advances over (i.e., for every distinct key), we skip the check when at
+        // the first value as that is redundant. Signal by thread interruption may be better, but
+        // it may also have unintended side-effects.
+        if (!atFirstValue && aborted.get()) {

Review comment:
       Checking atFirstValue and aborted will likely perform worse then just checking aborted all the time. It may seem redundant but the abort happens asynchronously so we may have gotten past the check in the ReadOperation already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-702819952


   Just adding the evidence to our bugs tracking the flakes. I will keep running a few more times.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498933467



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       Can you give an example to consider here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498595635



##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
##########
@@ -483,4 +483,29 @@ public void testExceptionInAbortSuppressed() throws Exception {
       assertThat(e.getSuppressed()[0].getMessage(), equalTo("suppressed in abort"));
     }
   }
+
+  @Test
+  public void testAbort() throws Exception {
+    // Operation must be an instance of ReadOperation or ReceivingOperation per preconditions
+    // in MapTaskExecutor.
+    Operation o = Mockito.mock(ReadOperation.class);
+
+    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    try (MapTaskExecutor executor =
+        new MapTaskExecutor(Arrays.<Operation>asList(o), counterSet, stateTracker)) {
+      Mockito.doAnswer(
+              invocation -> {
+                executor.abort();
+                return null;
+              })
+          .when(o)
+          .start();
+      executor.execute();
+      fail("Should have aborted");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InterruptedException);
+      Mockito.verify(o).abort();
+    }
+    Assert.assertTrue(Thread.currentThread().isInterrupted());

Review comment:
       Same

##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
##########
@@ -483,4 +483,29 @@ public void testExceptionInAbortSuppressed() throws Exception {
       assertThat(e.getSuppressed()[0].getMessage(), equalTo("suppressed in abort"));
     }
   }
+
+  @Test
+  public void testAbort() throws Exception {
+    // Operation must be an instance of ReadOperation or ReceivingOperation per preconditions
+    // in MapTaskExecutor.
+    Operation o = Mockito.mock(ReadOperation.class);
+
+    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    try (MapTaskExecutor executor =
+        new MapTaskExecutor(Arrays.<Operation>asList(o), counterSet, stateTracker)) {
+      Mockito.doAnswer(
+              invocation -> {
+                executor.abort();
+                return null;
+              })
+          .when(o)
+          .start();
+      executor.execute();
+      fail("Should have aborted");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InterruptedException);

Review comment:
       It would be nice for the next person if there is a diagnostic error message attached to this failure, lest we get a failure that says "false is not true".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-702796593


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498934933



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       My understanding of this change is that user code will typically finish a callback and the abort will occur when control returns to the worker. To a user it should be transparently like other failures. Is that not the case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498937353



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       Internally within Flume we saw with the C++ worker where user code didn't handle thread::cancel correctly and were relying on process crash to not get stuck.
   
   I would generally love for us to be able to interrupt random code arbitrarily as it would be the best and cleanest way to do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r498894067



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
##########
@@ -146,13 +165,17 @@ public boolean supportsRestart() {
   @Override
   public void abort() {
     // Signal the read loop to abort on the next record.
-    // TODO: Also interrupt the execution thread.
     for (Operation op : operations) {
       Preconditions.checkState(op instanceof ReadOperation || op instanceof ReceivingOperation);
       if (op instanceof ReadOperation) {
         ((ReadOperation) op).abortReadLoop();
       }
     }
+    synchronized (this) {
+      if (currentExecutorThread != null) {
+        currentExecutorThread.interrupt();

Review comment:
       There is no requirement for user code to handle being interrupted arbitrarily and there are enough instances that I have seen where this would not be handled gracefully by the user.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12994:
URL: https://github.com/apache/beam/pull/12994#discussion_r501855713



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java
##########
@@ -402,6 +432,14 @@ public boolean hasNext() {
 
       @Override
       public V next() {
+        // Given that the underlying ReadOperation already checks the abort status after every
+        // record it advances over (i.e., for every distinct key), we skip the check when at
+        // the first value as that is redundant. Signal by thread interruption may be better, but
+        // it may also have unintended side-effects.
+        if (!atFirstValue && aborted.get()) {

Review comment:
       Checking atFirstValue and aborted will likely perform worse then just checking aborted all the time. It may seem redundant but the abort happens asynchronously so we may have gotten past the check in the ReadOperation already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tudorm commented on pull request #12994: Add support for gracefully aborting workers.

Posted by GitBox <gi...@apache.org>.
tudorm commented on pull request #12994:
URL: https://github.com/apache/beam/pull/12994#issuecomment-705243648


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org