You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2016/10/04 17:50:54 UTC
svn commit: r1763313 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/streaming/ExecutableManager.java
src/org/apache/pig/impl/streaming/OutputHandler.java
test/org/apache/pig/test/TestStreamingLocal.java
Author: knoguchi
Date: Tue Oct 4 17:50:53 2016
New Revision: 1763313
URL: http://svn.apache.org/viewvc?rev=1763313&view=rev
Log:
PIG-4976: streaming job with store clause stuck if the script fail (daijy via knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct 4 17:50:53 2016
@@ -48,6 +48,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4976: streaming job with store clause stuck if the script fail (daijy via knoguchi)
+
PIG-5035: killJob API does not work in Tez (zjffdu via rohini)
PIG-5032: Output record stats in Tez is wrong when there is split followed by union (rohini)
Modified: pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Oct 4 17:50:53 2016
@@ -150,12 +150,13 @@ public class ExecutableManager {
LOG.debug("Process exited with: " + exitCode);
if (exitCode != SUCCESS) {
- LOG.error(command + " failed with exit status: "
- + exitCode);
+ String errMsg = "'" + command.toString() + "'" + " failed with exit status: " + exitCode;
+ LOG.error(errMsg);
+ Result res = new Result(POStatus.STATUS_ERR, errMsg);
+ sendOutput(poStream.getBinaryOutputQueue(), res);
}
- if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
-
+ if (exitCode == SUCCESS && outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
// Trigger the outputHandler
outputHandler.bindTo("", null, 0, -1);
@@ -178,10 +179,18 @@ public class ExecutableManager {
* @param process the process to be killed
* @throws IOException
*/
- private void killProcess(Process process) throws IOException {
+ private void killProcess(Process process) {
if (process != null) {
- inputHandler.close(process);
- outputHandler.close();
+ try {
+ inputHandler.close(process);
+ } catch (Exception e) {
+ LOG.info("Exception in killProcess while closing inputHandler. Ignoring:" + e.getMessage());
+ }
+ try {
+ outputHandler.close();
+ } catch (Exception e) {
+ LOG.info("Exception in killProcess while closing outputHandler. Ignoring:" + e.getMessage());
+ }
process.destroy();
}
}
@@ -334,7 +343,7 @@ public class ExecutableManager {
// we will only call close() here and not
// worry about deducing whether the process died
// normally or abnormally - if there was any real
- // issue the ProcessOutputThread should see
+ // issue we should see
// a non zero exit code from the process and send
// a POStatus.STATUS_ERR back - what if we got
// an IOException because there was only an issue with
@@ -344,14 +353,6 @@ public class ExecutableManager {
return;
} else {
// asynchronous case - then this is a real exception
- LOG.error("Exception while trying to write to stream binary's input", e);
- // send POStatus.STATUS_ERR to POStream to signal the error
- // Generally the ProcessOutputThread would do this but now
- // we should do it here since neither the process nor the
- // ProcessOutputThread will ever be spawned
- Result res = new Result(POStatus.STATUS_ERR,
- "Exception while trying to write to stream binary's input" + e.getMessage());
- sendOutput(poStream.getBinaryOutputQueue(), res);
throw e;
}
}
@@ -362,13 +363,13 @@ public class ExecutableManager {
} catch (Throwable t) {
// Note that an error occurred
outerrThreadsError = t;
- LOG.error( "Error while reading from POStream and " +
- "passing it to the streaming process", t);
- try {
- killProcess(process);
- } catch (IOException ioe) {
- LOG.warn(ioe);
- }
+ Result res = new Result(POStatus.STATUS_ERR,
+ "Error while reading from POStream and " +
+ "passing it to the streaming process:" + t.getMessage());
+ LOG.error("Error while reading from POStream and " +
+ "passing it to the streaming process:", t);
+ sendOutput(poStream.getBinaryOutputQueue(), res);
+ killProcess(process);
}
}
}
@@ -452,13 +453,7 @@ public class ExecutableManager {
try {
exitCode = process.waitFor();
} catch (InterruptedException ie) {
- try {
- killProcess(process);
- } catch (IOException e) {
- LOG.warn("Exception trying to kill process while processing null output " +
- "from binary", e);
-
- }
+ killProcess(process);
// signal error
String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
ie.getMessage();
Modified: pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Oct 4 17:50:53 2016
@@ -175,8 +175,10 @@ public abstract class OutputHandler {
*/
public synchronized void close() throws IOException {
if(!alreadyClosed) {
- istream.close();
- istream = null;
+ if( istream != null ) {
+ istream.close();
+ istream = null;
+ }
alreadyClosed = true;
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=1763313&r1=1763312&r2=1763313&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Tue Oct 4 17:50:53 2016
@@ -18,6 +18,7 @@
package org.apache.pig.test;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -372,4 +373,41 @@ public class TestStreamingLocal {
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
}
+
+ @Test
+ // Perl script with a syntax error, See PIG-4976
+ public void testNegativeScriptSyntaxError() throws IOException {
+
+ for( int numinput : new int [] {10, 9999} ) {
+ String[] inputStrings = new String[numinput];
+ for (int i=0;i<numinput;i++) {
+ inputStrings[i] = Integer.toString(i);
+ }
+ File input = Util.createInputFile("tmp", "", inputStrings);
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "syntax error",
+ };
+ File command1 = Util.createInputFile("script", "pl", script);
+ String query =
+ "define CMD `perl " + command1.getName() + "` output('foo')" +
+ "ship ('" + Util.encodeEscape(command1.toString()) + "');";
+ boolean succeeded=true;
+ try {
+ pigServer.registerQuery( query );
+ pigServer.registerQuery("A = load '"
+ + Util.generateURI(input.toString(),
+ pigServer.getPigContext())
+ + "' using PigStorage();");
+ pigServer.registerQuery("B = stream A through CMD;");
+ pigServer.openIterator("B");
+ } catch(Exception ex) {
+ succeeded=false;
+ }
+ Assert.assertFalse("Job with " + numinput + " lines input did not fail.", succeeded);
+ }
+ }
+
}