You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/02/27 19:30:20 UTC
svn commit: r1662776 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
Author: daijy
Date: Fri Feb 27 18:30:19 2015
New Revision: 1662776
URL: http://svn.apache.org/r1662776
Log:
PIG-4412: Race condition in writing multiple outputs from STREAM op
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1662776&r1=1662775&r2=1662776&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Feb 27 18:30:19 2015
@@ -50,6 +50,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4412: Race condition in writing multiple outputs from STREAM op (jwills via daijy)
+
PIG-4408: Merge join should support replicated join as a predecessor (bridiver via daijy)
PIG-4389: Flag to run selected test suites in e2e tests (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1662776&r1=1662775&r2=1662776&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Fri Feb 27 18:30:19 2015
@@ -165,8 +165,11 @@ public class POStream extends PhysicalOp
// map or reduce. So once we send this EOP down,
// getNext() in POStream should never be called. So
// we don't need to set any flag noting we saw all output
- // from binary
+ // from binary. We also need to be sure to finish the
+ // executable manager's execution so any side
+ // output files are correctly moved to HDFS.
r = EOP_RESULT;
+ finish();
} else if (r.returnStatus == POStatus.STATUS_OK)
illustratorMarkup(r.result, r.result, 0);
return(r);
@@ -201,8 +204,11 @@ public class POStream extends PhysicalOp
// map or reduce. So once we send this EOP down,
// getNext() in POStream should never be called. So
// we don't need to set any flag noting we saw all output
- // from binary
+ // from binary. We also need to be sure to shutdown
+ // the executable manager so any side outputs are
+ // properly moved to HDFS.
r = EOP_RESULT;
+ finish();
}
}
@@ -217,6 +223,7 @@ public class POStream extends PhysicalOp
// should never be called. So we don't need to set any
// flag noting we saw all output from binary
r = EOP_RESULT;
+ finish();
} else if (r.returnStatus == POStatus.STATUS_OK)
illustratorMarkup(r.result, r.result, 0);
return r;
@@ -232,6 +239,7 @@ public class POStream extends PhysicalOp
// for future calls
r = EOP_RESULT;
allOutputFromBinaryProcessed = true;
+ finish();
} else if (r.returnStatus == POStatus.STATUS_OK)
illustratorMarkup(r.result, r.result, 0);
return r;
@@ -351,7 +359,10 @@ public class POStream extends PhysicalOp
*
*/
public void finish() throws IOException {
- executableManager.close();
+ if (executableManager != null) {
+ executableManager.close();
+ executableManager = null;
+ }
}
/**