You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2016/10/04 17:50:54 UTC

svn commit: r1763313 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/streaming/ExecutableManager.java src/org/apache/pig/impl/streaming/OutputHandler.java test/org/apache/pig/test/TestStreamingLocal.java

Author: knoguchi
Date: Tue Oct  4 17:50:53 2016
New Revision: 1763313

URL: http://svn.apache.org/viewvc?rev=1763313&view=rev
Log:
PIG-4976: streaming job with store clause stuck if the script fail (daijy via knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
    pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
    pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct  4 17:50:53 2016
@@ -48,6 +48,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4976: streaming job with store clause stuck if the script fail (daijy via knoguchi)
+
 PIG-5035: killJob API does not work in Tez (zjffdu via rohini)
 
 PIG-5032: Output record stats in Tez is wrong when there is split followed by union (rohini)

Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Oct  4 17:50:53 2016
@@ -150,12 +150,13 @@ public class ExecutableManager {
 
         LOG.debug("Process exited with: " + exitCode);
         if (exitCode != SUCCESS) {
-            LOG.error(command + " failed with exit status: "
-                    + exitCode);
+            String errMsg = "'" + command.toString() + "'" + " failed with exit status: " + exitCode;
+            LOG.error(errMsg);
+            Result res = new Result(POStatus.STATUS_ERR, errMsg);
+            sendOutput(poStream.getBinaryOutputQueue(), res);
         }
 
-        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
-
+        if (exitCode == SUCCESS && outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
             // Trigger the outputHandler
             outputHandler.bindTo("", null, 0, -1);
 
@@ -178,10 +179,18 @@ public class ExecutableManager {
      * @param process the process to be killed
      * @throws IOException
      */
-    private void killProcess(Process process) throws IOException {
+    private void killProcess(Process process) {
         if (process != null) {
-            inputHandler.close(process);
-            outputHandler.close();
+            try {
+                inputHandler.close(process);
+            } catch (Exception e) {
+                LOG.info("Exception in killProcess while closing inputHandler. Ignoring:" + e.getMessage());
+            }
+            try {
+                outputHandler.close();
+            } catch (Exception e) {
+                LOG.info("Exception in killProcess while closing outputHandler. Ignoring:" + e.getMessage());
+            }
             process.destroy();
         }
     }
@@ -334,7 +343,7 @@ public class ExecutableManager {
                                 // we will only call close() here and not
                                 // worry about deducing whether the process died
                                 // normally or abnormally - if there was any real
-                                // issue the ProcessOutputThread should see
+                                // issue we should see
                                 // a non zero exit code from the process and send
                                 // a POStatus.STATUS_ERR back - what if we got
                                 // an IOException because there was only an issue with
@@ -344,14 +353,6 @@ public class ExecutableManager {
                                 return;
                             } else {
                                 // asynchronous case - then this is a real exception
-                                LOG.error("Exception while trying to write to stream binary's input", e);
-                                // send POStatus.STATUS_ERR to POStream to signal the error
-                                // Generally the ProcessOutputThread would do this but now
-                                // we should do it here since neither the process nor the
-                                // ProcessOutputThread will ever be spawned
-                                Result res = new Result(POStatus.STATUS_ERR,
-                                        "Exception while trying to write to stream binary's input" + e.getMessage());
-                                sendOutput(poStream.getBinaryOutputQueue(), res);
                                 throw e;
                             }
                         }
@@ -362,13 +363,13 @@ public class ExecutableManager {
             } catch (Throwable t) {
                 // Note that an error occurred
                 outerrThreadsError = t;
-                LOG.error( "Error while reading from POStream and " +
-                           "passing it to the streaming process", t);
-                try {
-                    killProcess(process);
-                } catch (IOException ioe) {
-                    LOG.warn(ioe);
-                }
+                Result res = new Result(POStatus.STATUS_ERR,
+                                        "Error while reading from POStream and " +
+                                        "passing it to the streaming process:" + t.getMessage());
+                LOG.error("Error while reading from POStream and " +
+                          "passing it to the streaming process:", t);
+                sendOutput(poStream.getBinaryOutputQueue(), res);
+                killProcess(process);
             }
         }
     }
@@ -452,13 +453,7 @@ public class ExecutableManager {
                 try {
                     exitCode = process.waitFor();
                 } catch (InterruptedException ie) {
-                    try {
-                        killProcess(process);
-                    } catch (IOException e) {
-                        LOG.warn("Exception trying to kill process while processing null output " +
-                                "from binary", e);
-
-                    }
+                    killProcess(process);
                     // signal error
                     String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
                             ie.getMessage();

Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Oct  4 17:50:53 2016
@@ -175,8 +175,10 @@ public abstract class OutputHandler {
      */
     public synchronized void close() throws IOException {
         if(!alreadyClosed) {
-            istream.close();
-            istream = null;
+            if( istream != null ) {
+                istream.close();
+                istream = null;
+            }
             alreadyClosed = true;
         }
     }

Modified: pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Tue Oct  4 17:50:53 2016
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
@@ -372,4 +373,41 @@ public class TestStreamingLocal {
             Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
         }
     }
+
+    @Test
+    // Perl script with a syntax error, See PIG-4976
+    public void testNegativeScriptSyntaxError() throws IOException {
+
+        for( int numinput : new int [] {10, 9999} ) {
+            String[] inputStrings = new String[numinput];
+            for (int i=0;i<numinput;i++) {
+                inputStrings[i] = Integer.toString(i);
+            }
+            File input = Util.createInputFile("tmp", "", inputStrings);
+            // Perl script
+            String[] script =
+                new String[] {
+                              "#!/usr/bin/perl",
+                              "syntax error",
+                             };
+            File command1 = Util.createInputFile("script", "pl", script);
+            String query =
+                    "define CMD `perl " + command1.getName() + "` output('foo')" +
+                    "ship ('" + Util.encodeEscape(command1.toString()) + "');";
+            boolean succeeded=true;
+            try {
+                pigServer.registerQuery( query );
+                pigServer.registerQuery("A = load '"
+                        + Util.generateURI(input.toString(),
+                                pigServer.getPigContext())
+                        + "' using PigStorage();");
+                pigServer.registerQuery("B = stream A through CMD;");
+                pigServer.openIterator("B");
+            } catch(Exception ex) {
+                succeeded=false;
+            }
+           Assert.assertFalse("Job with " + numinput + " lines input did not fail.", succeeded);
+        }
+    }
+
 }