You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/11/02 20:55:13 UTC
svn commit: r1712130 - in /pig/branches/branch-0.15: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
test/e2e/pig/tests/nightly.conf
Author: daijy
Date: Mon Nov 2 19:55:12 2015
New Revision: 1712130
URL: http://svn.apache.org/viewvc?rev=1712130&view=rev
Log:
PIG-4707: [Pig on Tez] Streaming job hangs with pig.exec.mapPartAgg=true
Modified:
pig/branches/branch-0.15/CHANGES.txt
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1712130&r1=1712129&r2=1712130&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon Nov 2 19:55:12 2015
@@ -28,6 +28,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4707: [Pig on Tez] Streaming job hangs with pig.exec.mapPartAgg=true (rohini)
+
PIG-4679: Performance degradation due to InputSizeReducerEstimator since PIG-3754 (daijy)
PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)
Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1712130&r1=1712129&r2=1712130&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Nov 2 19:55:12 2015
@@ -26,26 +26,27 @@ import java.util.concurrent.ArrayBlockin
import java.util.concurrent.BlockingQueue;
import org.apache.pig.PigException;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.streaming.ExecutableManager;
-import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.pen.util.ExampleTuple;
public class POStream extends PhysicalOperator {
private static final long serialVersionUID = 2L;
-
+
private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null);
private String executableManagerStr; // String representing ExecutableManager to use
- transient private ExecutableManager executableManager; // ExecutableManager to use
+ transient private ExecutableManager executableManager; // ExecutableManager to use
private StreamingCommand command; // Actual command to be run
private Properties properties;
@@ -67,7 +68,7 @@ public class POStream extends PhysicalOp
*/
private boolean isFetchable;
- public POStream(OperatorKey k, ExecutableManager executableManager,
+ public POStream(OperatorKey k, ExecutableManager executableManager,
StreamingCommand command, Properties properties) {
super(k);
this.executableManagerStr = executableManager.getClass().getName();
@@ -76,21 +77,21 @@ public class POStream extends PhysicalOp
// Setup streaming-specific properties
if (command.getShipFiles()) {
- parseShipCacheSpecs(command.getShipSpecs(),
+ parseShipCacheSpecs(command.getShipSpecs(),
properties, "pig.streaming.ship.files");
}
- parseShipCacheSpecs(command.getCacheSpecs(),
+ parseShipCacheSpecs(command.getCacheSpecs(),
properties, "pig.streaming.cache.files");
}
-
- private static void parseShipCacheSpecs(List<String> specs,
+
+ private static void parseShipCacheSpecs(List<String> specs,
Properties properties, String property) {
-
+
String existingValue = properties.getProperty(property, "");
if (specs == null || specs.size() == 0) {
return;
}
-
+
// Setup streaming-specific properties
StringBuffer sb = new StringBuffer();
Iterator<String> i = specs.iterator();
@@ -107,13 +108,13 @@ public class POStream extends PhysicalOp
sb.append(", ");
}
}
- properties.setProperty(property, sb.toString());
+ properties.setProperty(property, sb.toString());
}
public Properties getShipCacheProperties() {
return properties;
}
-
+
/**
* Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
* @return the {@link StreamingCommand} for this <code>StreamSpec</code>
@@ -121,17 +122,13 @@ public class POStream extends PhysicalOp
public StreamingCommand getCommand() {
return command;
}
-
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)
- */
+
@Override
public Result getNextTuple() throws ExecException {
// The POStream Operator works with ExecutableManager to
// send input to the streaming binary and to get output
// from it. To achieve a tuple oriented behavior, two queues
- // are used - one for output from the binary and one for
+ // are used - one for output from the binary and one for
// input to the binary. In each getNext() call:
// 1) If there is no more output expected from the binary, an EOP is
// sent to successor
@@ -141,14 +138,14 @@ public class POStream extends PhysicalOp
// send input to the binary, then the next tuple from the
// predecessor is got and passed to the binary
try {
- // if we are being called AFTER all output from the streaming
+ // if we are being called AFTER all output from the streaming
// binary has already been sent to us then just return EOP
// The "allOutputFromBinaryProcessed" flag is set when we see
// an EOS (End of Stream output) from streaming binary
if(allOutputFromBinaryProcessed) {
- return new Result(POStatus.STATUS_EOP, null);
+ return EOP_RESULT;
}
-
+
// if we are here AFTER all map() calls have been completed
// AND AFTER we process all possible input to be sent to the
// streaming binary, then all we want to do is read output from
@@ -159,19 +156,16 @@ public class POStream extends PhysicalOp
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
- // the pipeline. Also since we are being called
- // after all input from predecessor has been processed
- // it means we got here from a call from close() in
- // map or reduce. So once we send this EOP down,
- // getNext() in POStream should never be called. So
- // we don't need to set any flag noting we saw all output
- // from binary
+ // the pipeline and also note this condition
+ // for future calls
r = EOP_RESULT;
- } else if (r.returnStatus == POStatus.STATUS_OK)
+ allOutputFromBinaryProcessed = true;
+ } else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
+ }
return(r);
}
-
+
// if we are here, we haven't consumed all input to be sent
// to the streaming binary - check if we are being called
// from close() on the map or reduce
@@ -184,7 +178,7 @@ public class POStream extends PhysicalOp
// then "initialized" will be true. If not, just
// send EOP down.
if(getInitialized()) {
- // signal End of ALL input to the Executable Manager's
+ // signal End of ALL input to the Executable Manager's
// Input handler thread
binaryInputQueue.put(r);
// note this state for future calls
@@ -195,30 +189,24 @@ public class POStream extends PhysicalOp
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
- // the pipeline. Also since we are being called
- // after all input from predecessor has been processed
- // it means we got here from a call from close() in
- // map or reduce. So once we send this EOP down,
- // getNext() in POStream should never be called. So
- // we don't need to set any flag noting we saw all output
- // from binary
+ // the pipeline and also note this condition
+ // for future calls
r = EOP_RESULT;
+ allOutputFromBinaryProcessed = true;
}
}
-
+
} else if(r.returnStatus == POStatus.STATUS_EOS) {
// If we received EOS, it means all output
// from the streaming binary has been sent to us
// So we can send an EOP to the successor in
- // the pipeline. Also we are being called
- // from close() in map or reduce (this is so because
- // only then this.parentPlan.endOfAllInput is true).
- // So once we send this EOP down, getNext() in POStream
- // should never be called. So we don't need to set any
- // flag noting we saw all output from binary
+ // the pipeline and also note this condition
+ // for future calls
r = EOP_RESULT;
- } else if (r.returnStatus == POStatus.STATUS_OK)
+ allOutputFromBinaryProcessed = true;
+ } else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
+ }
return r;
} else {
// we are not being called from close() - so
@@ -232,18 +220,19 @@ public class POStream extends PhysicalOp
// for future calls
r = EOP_RESULT;
allOutputFromBinaryProcessed = true;
- } else if (r.returnStatus == POStatus.STATUS_OK)
+ } else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
+ }
return r;
}
-
+
} catch(Exception e) {
int errCode = 2083;
String msg = "Error while trying to get next result in POStream.";
throw new ExecException(msg, errCode, PigException.BUG, e);
}
-
-
+
+
}
public synchronized boolean getInitialized() {
@@ -264,13 +253,13 @@ public class POStream extends PhysicalOp
Result res = binaryOutputQueue.take();
return res;
}
-
- // check if we can write tuples to
+
+ // check if we can write tuples to
// input of the process
if(binaryInputQueue.remainingCapacity() > 0) {
-
+
Result input = processInput();
- if(input.returnStatus == POStatus.STATUS_EOP ||
+ if(input.returnStatus == POStatus.STATUS_EOP ||
input.returnStatus == POStatus.STATUS_ERR) {
return input;
} else {
@@ -278,16 +267,16 @@ public class POStream extends PhysicalOp
// Only when we see the first tuple which can
// be sent as input to the binary we want
// to initialize the ExecutableManager and set
- // up the streaming binary - this is required in
+ // up the streaming binary - this is required in
// Unions due to a JOIN where there may never be
// any input to send to the binary in one of the map
// tasks - so we initialize only if we have to.
// initialize the ExecutableManager once
if(!initialized) {
// set up the executableManager
- executableManager =
+ executableManager =
(ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr);
-
+
try {
executableManager.configure(this);
executableManager.run();
@@ -295,22 +284,22 @@ public class POStream extends PhysicalOp
int errCode = 2084;
String msg = "Error while running streaming binary.";
throw new ExecException(msg, errCode, PigException.BUG, ioe);
- }
+ }
initialized = true;
}
-
+
// send this input to the streaming
// process
binaryInputQueue.put(input);
}
-
+
} else {
-
+
// wait for either input to be available
// or output to be consumed
while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty())
wait();
-
+
}
}
}
@@ -320,21 +309,22 @@ public class POStream extends PhysicalOp
throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
-
+
+ @Override
public String toString() {
return getAliasString() + "POStream" + "[" + command.toString() + "]"
+ " - " + mKey.toString();
}
-
+
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitStream(this);
-
+
}
@Override
public String name() {
- return toString();
+ return toString();
}
@Override
@@ -348,7 +338,7 @@ public class POStream extends PhysicalOp
}
/**
- *
+ *
*/
public void finish() throws IOException {
executableManager.close();
@@ -367,7 +357,7 @@ public class POStream extends PhysicalOp
public BlockingQueue<Result> getBinaryOutputQueue() {
return binaryOutputQueue;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1712130&r1=1712129&r2=1712130&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Mon Nov 2 19:55:12 2015
@@ -689,7 +689,24 @@ store c into ':OUTPATH:';\,
store d into ':OUTPATH:'; #,
'java_params' => ['-Dpig.exec.mapPartAgg=true']
- },
+ },
+
+ {
+ #PIG-4707 Streaming and empty input
+
+ 'num' => 6,
+ 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
+ b = group a by name;
+ c = foreach b generate flatten(a);
+ d = stream c through `cat` as (name, age, gpa);
+ e = filter d by name == 'nonexistent';
+ SPLIT e into f if gpa > 2, g otherwise;
+ store f into ':OUTPATH:.1';
+ store g into ':OUTPATH:.2';
+ #,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true']
+
+ },
],
},