You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/09/28 13:35:43 UTC

oozie git commit: OOZIE-3354 [core] [SSH action] SSH action gets hung (andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master 33aec49da -> 4aa7bbde3


OOZIE-3354 [core] [SSH action] SSH action gets hung (andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4aa7bbde
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4aa7bbde
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4aa7bbde

Branch: refs/heads/master
Commit: 4aa7bbde3570123833ab7ba2c51b7351647d1deb
Parents: 33aec49
Author: Andras Piros <an...@cloudera.com>
Authored: Fri Sep 28 15:34:42 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Fri Sep 28 15:34:42 2018 +0200

----------------------------------------------------------------------
 .../oozie/action/ssh/SshActionExecutor.java     | 36 ++++++++++++++++----
 release-log.txt                                 |  1 +
 2 files changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4aa7bbde/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
index 1c76a87..3e0e3c5 100644
--- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
@@ -740,8 +740,10 @@ public class SshActionExecutor extends ActionExecutor {
      * @return the exit value of the processSettings.
      * @throws IOException
      */
-    private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
+    private int drainBuffers(final Process p, final StringBuffer inputBuffer, final StringBuffer errorBuffer, final int maxLength)
             throws IOException {
+        LOG.trace("drainBuffers() start");
+
         int exitValue = -1;
 
         int inBytesRead = 0;
@@ -749,22 +751,42 @@ public class SshActionExecutor extends ActionExecutor {
 
         boolean processEnded = false;
 
-        try (BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream(), Charsets.UTF_8));
-             BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream(), Charsets.UTF_8))) {
+        try (final BufferedReader ir = new BufferedReader(new InputStreamReader(p.getInputStream(), Charsets.UTF_8));
+             final BufferedReader er = new BufferedReader(new InputStreamReader(p.getErrorStream(), Charsets.UTF_8))) {
+            // Here we do some kind of busy waiting, checking whether the process has finished by calling Process#exitValue().
+            // If not yet finished, an IllegalThreadStateException is thrown and ignored, the progress on stdout and stderr read,
+            // and retried until the process has ended.
+            // Note that Process#waitFor() may block sometimes, that's why we do a polling mechanism using Process#exitValue()
+            // instead. Until we extend unit and integration test coverage for SSH action, and we can introduce a more sophisticated
+            // error handling based on the extended coverage, this solution should stay in place.
             while (!processEnded) {
                 try {
-                    exitValue = p.waitFor();
+                    // Doesn't block but throws IllegalThreadStateException if the process hasn't finished yet
+                    exitValue = p.exitValue();
                     processEnded = true;
                 }
-                catch (final IllegalThreadStateException | InterruptedException e) {
-                    LOG.warn("An exception occurred while waiting for the process, continuing to drain. " +
-                            "[e.message={0}]", e.getMessage());
+                catch (final IllegalThreadStateException itse) {
+                    // Continue to drain
                 }
 
+                // Drain input and error streams
                 inBytesRead += drainBuffer(ir, inputBuffer, maxLength, inBytesRead, processEnded);
                 errBytesRead += drainBuffer(er, errorBuffer, maxLength, errBytesRead, processEnded);
+
+                // Necessary evil: sleep and retry
+                if (!processEnded) {
+                    try {
+                        Thread.sleep(500);
+                    }
+                    catch (final InterruptedException ie) {
+                        // Sleep a little, then check again
+                    }
+                }
             }
         }
+
+        LOG.trace("drainBuffers() end [exitValue={0}]", exitValue);
+
         return exitValue;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/4aa7bbde/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ff51553..d4d0b1e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3354 [core] [SSH action] SSH action gets hung (andras.piros)
 OOZIE-3343 amend [build] [tests] Add the first five test errors per module to the report (kmarton via andras.piros)
 OOZIE-3348 [Hive action] Remove dependency hive-contrib (kmarton via andras.piros)
 OOZIE-3340 [fluent-job] Create error handler ACTION only if needed (kmarton, andras.piros)