You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/11/02 20:55:13 UTC

svn commit: r1712130 - in /pig/branches/branch-0.15: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java test/e2e/pig/tests/nightly.conf

Author: daijy
Date: Mon Nov  2 19:55:12 2015
New Revision: 1712130

URL: http://svn.apache.org/viewvc?rev=1712130&view=rev
Log:
PIG-4707: [Pig on Tez] Streaming job hangs with pig.exec.mapPartAgg=true

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1712130&r1=1712129&r2=1712130&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Mon Nov  2 19:55:12 2015
@@ -28,6 +28,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-4707: [Pig on Tez] Streaming job hangs with pig.exec.mapPartAgg=true (rohini)
+
 PIG-4679: Performance degradation due to InputSizeReducerEstimator since PIG-3754 (daijy)
 
 PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)

Modified: pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1712130&r1=1712129&r2=1712130&view=diff
==============================================================================
--- pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Nov  2 19:55:12 2015
@@ -26,26 +26,27 @@ import java.util.concurrent.ArrayBlockin
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.pig.PigException;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.streaming.ExecutableManager;
-import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.pen.util.ExampleTuple;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.pen.util.ExampleTuple;
 
 public class POStream extends PhysicalOperator {
     private static final long serialVersionUID = 2L;
-    
+
     private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null);
 
     private String executableManagerStr;            // String representing ExecutableManager to use
-    transient private ExecutableManager executableManager;    // ExecutableManager to use 
+    transient private ExecutableManager executableManager;    // ExecutableManager to use
     private StreamingCommand command;               // Actual command to be run
     private Properties properties;
 
@@ -67,7 +68,7 @@ public class POStream extends PhysicalOp
      */
     private boolean isFetchable;
 
-    public POStream(OperatorKey k, ExecutableManager executableManager, 
+    public POStream(OperatorKey k, ExecutableManager executableManager,
                       StreamingCommand command, Properties properties) {
         super(k);
         this.executableManagerStr = executableManager.getClass().getName();
@@ -76,21 +77,21 @@ public class POStream extends PhysicalOp
 
         // Setup streaming-specific properties
         if (command.getShipFiles()) {
-            parseShipCacheSpecs(command.getShipSpecs(), 
+            parseShipCacheSpecs(command.getShipSpecs(),
                                 properties, "pig.streaming.ship.files");
         }
-        parseShipCacheSpecs(command.getCacheSpecs(), 
+        parseShipCacheSpecs(command.getCacheSpecs(),
                             properties, "pig.streaming.cache.files");
     }
-    
-    private static void parseShipCacheSpecs(List<String> specs, 
+
+    private static void parseShipCacheSpecs(List<String> specs,
             Properties properties, String property) {
-        
+
         String existingValue = properties.getProperty(property, "");
         if (specs == null || specs.size() == 0) {
             return;
         }
-        
+
         // Setup streaming-specific properties
         StringBuffer sb = new StringBuffer();
         Iterator<String> i = specs.iterator();
@@ -107,13 +108,13 @@ public class POStream extends PhysicalOp
                 sb.append(", ");
             }
         }
-        properties.setProperty(property, sb.toString());        
+        properties.setProperty(property, sb.toString());
     }
 
     public Properties getShipCacheProperties() {
         return properties;
     }
-    
+
     /**
      * Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
      * @return the {@link StreamingCommand} for this <code>StreamSpec</code>
@@ -121,17 +122,13 @@ public class POStream extends PhysicalOp
     public StreamingCommand getCommand() {
         return command;
     }
-    
-    
-    /* (non-Javadoc)
-     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)
-     */
+
     @Override
     public Result getNextTuple() throws ExecException {
         // The POStream Operator works with ExecutableManager to
         // send input to the streaming binary and to get output
         // from it. To achieve a tuple oriented behavior, two queues
-        // are used - one for output from the binary and one for 
+        // are used - one for output from the binary and one for
         // input to the binary. In each getNext() call:
         // 1) If there is no more output expected from the binary, an EOP is
         // sent to successor
@@ -141,14 +138,14 @@ public class POStream extends PhysicalOp
         // send input to the binary, then the next tuple from the
         // predecessor is got and passed to the binary
         try {
-            // if we are being called AFTER all output from the streaming 
+            // if we are being called AFTER all output from the streaming
             // binary has already been sent to us then just return EOP
             // The "allOutputFromBinaryProcessed" flag is set when we see
             // an EOS (End of Stream output) from streaming binary
             if(allOutputFromBinaryProcessed) {
-                return new Result(POStatus.STATUS_EOP, null);
+                return EOP_RESULT;
             }
-            
+
             // if we are here AFTER all map() calls have been completed
             // AND AFTER we process all possible input to be sent to the
             // streaming binary, then all we want to do is read output from
@@ -159,19 +156,16 @@ public class POStream extends PhysicalOp
                     // If we received EOS, it means all output
                     // from the streaming binary has been sent to us
                     // So we can send an EOP to the successor in
-                    // the pipeline. Also since we are being called
-                    // after all input from predecessor has been processed
-                    // it means we got here from a call from close() in
-                    // map or reduce. So once we send this EOP down, 
-                    // getNext() in POStream should never be called. So
-                    // we don't need to set any flag noting we saw all output
-                    // from binary
+                    // the pipeline and also note this condition
+                    // for future calls
                     r = EOP_RESULT;
-                } else if (r.returnStatus == POStatus.STATUS_OK)
+                    allOutputFromBinaryProcessed = true;
+                } else if (r.returnStatus == POStatus.STATUS_OK) {
                     illustratorMarkup(r.result, r.result, 0);
+                }
                 return(r);
             }
-            
+
             // if we are here, we haven't consumed all input to be sent
             // to the streaming binary - check if we are being called
             // from close() on the map or reduce
@@ -184,7 +178,7 @@ public class POStream extends PhysicalOp
                     // then "initialized" will be true. If not, just
                     // send EOP down.
                     if(getInitialized()) {
-                        // signal End of ALL input to the Executable Manager's 
+                        // signal End of ALL input to the Executable Manager's
                         // Input handler thread
                         binaryInputQueue.put(r);
                         // note this state for future calls
@@ -195,30 +189,24 @@ public class POStream extends PhysicalOp
                             // If we received EOS, it means all output
                             // from the streaming binary has been sent to us
                             // So we can send an EOP to the successor in
-                            // the pipeline. Also since we are being called
-                            // after all input from predecessor has been processed
-                            // it means we got here from a call from close() in
-                            // map or reduce. So once we send this EOP down, 
-                            // getNext() in POStream should never be called. So
-                            // we don't need to set any flag noting we saw all output
-                            // from binary
+                            // the pipeline and also note this condition
+                            // for future calls
                             r = EOP_RESULT;
+                            allOutputFromBinaryProcessed = true;
                         }
                     }
-                    
+
                 } else if(r.returnStatus == POStatus.STATUS_EOS) {
                     // If we received EOS, it means all output
                     // from the streaming binary has been sent to us
                     // So we can send an EOP to the successor in
-                    // the pipeline. Also we are being called
-                    // from close() in map or reduce (this is so because
-                    // only then this.parentPlan.endOfAllInput is true).
-                    // So once we send this EOP down, getNext() in POStream
-                    // should never be called. So we don't need to set any 
-                    // flag noting we saw all output from binary
+                    // the pipeline and also note this condition
+                    // for future calls
                     r = EOP_RESULT;
-                } else if (r.returnStatus == POStatus.STATUS_OK)
+                    allOutputFromBinaryProcessed = true;
+                } else if (r.returnStatus == POStatus.STATUS_OK) {
                   illustratorMarkup(r.result, r.result, 0);
+                }
                 return r;
             } else {
                 // we are not being called from close() - so
@@ -232,18 +220,19 @@ public class POStream extends PhysicalOp
                     // for future calls
                     r = EOP_RESULT;
                     allOutputFromBinaryProcessed  = true;
-                } else if (r.returnStatus == POStatus.STATUS_OK)
+                } else if (r.returnStatus == POStatus.STATUS_OK) {
                     illustratorMarkup(r.result, r.result, 0);
+                }
                 return r;
             }
-            
+
         } catch(Exception e) {
             int errCode = 2083;
             String msg = "Error while trying to get next result in POStream.";
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-            
-        
+
+
     }
 
     public synchronized boolean getInitialized() {
@@ -264,13 +253,13 @@ public class POStream extends PhysicalOp
                         Result res = binaryOutputQueue.take();
                         return res;
                     }
-                    
-                    // check if we can write tuples to 
+
+                    // check if we can write tuples to
                     // input of the process
                     if(binaryInputQueue.remainingCapacity() > 0) {
-                        
+
                         Result input = processInput();
-                        if(input.returnStatus == POStatus.STATUS_EOP || 
+                        if(input.returnStatus == POStatus.STATUS_EOP ||
                                 input.returnStatus == POStatus.STATUS_ERR) {
                             return input;
                         } else {
@@ -278,16 +267,16 @@ public class POStream extends PhysicalOp
                             // Only when we see the first tuple which can
                             // be sent as input to the binary we want
                             // to initialize the ExecutableManager and set
-                            // up the streaming binary - this is required in 
+                            // up the streaming binary - this is required in
                             // Unions due to a JOIN where there may never be
                             // any input to send to the binary in one of the map
                             // tasks - so we initialize only if we have to.
                             // initialize the ExecutableManager once
                             if(!initialized) {
                                 // set up the executableManager
-                                executableManager = 
+                                executableManager =
                                     (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr);
-                                
+
                                 try {
                                     executableManager.configure(this);
                                     executableManager.run();
@@ -295,22 +284,22 @@ public class POStream extends PhysicalOp
                                     int errCode = 2084;
                                     String msg = "Error while running streaming binary.";
                                     throw new ExecException(msg, errCode, PigException.BUG, ioe);
-                                }            
+                                }
                                 initialized = true;
                             }
-                            
+
                             // send this input to the streaming
                             // process
                             binaryInputQueue.put(input);
                         }
-                        
+
                     } else {
-                        
+
                         // wait for either input to be available
                         // or output to be consumed
                         while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty())
                             wait();
-                        
+
                     }
                 }
             }
@@ -320,21 +309,22 @@ public class POStream extends PhysicalOp
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
+    @Override
     public String toString() {
         return getAliasString() + "POStream" + "[" + command.toString() + "]"
                 + " - " + mKey.toString();
     }
- 
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitStream(this);
-        
+
     }
 
     @Override
     public String name() {
-       return toString(); 
+       return toString();
     }
 
     @Override
@@ -348,7 +338,7 @@ public class POStream extends PhysicalOp
     }
 
     /**
-     * 
+     *
      */
     public void finish() throws IOException {
         executableManager.close();
@@ -367,7 +357,7 @@ public class POStream extends PhysicalOp
     public BlockingQueue<Result> getBinaryOutputQueue() {
         return binaryOutputQueue;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
       if(illustrator != null) {

Modified: pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf?rev=1712130&r1=1712129&r2=1712130&view=diff
==============================================================================
--- pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.15/test/e2e/pig/tests/nightly.conf Mon Nov  2 19:55:12 2015
@@ -689,7 +689,24 @@ store c into ':OUTPATH:';\,
                         store d into ':OUTPATH:'; #,
             'java_params' => ['-Dpig.exec.mapPartAgg=true']
             
-            },            
+            },
+            
+            {
+            #PIG-4707 Streaming and empty input
+
+            'num' => 6,
+            'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); 
+                        b = group a by name;
+                        c = foreach b generate flatten(a);
+                        d = stream c through `cat` as (name, age, gpa);
+                        e = filter d by name == 'nonexistent';
+                        SPLIT e into f if gpa > 2, g otherwise;
+                        store f into ':OUTPATH:.1'; 
+                        store g into ':OUTPATH:.2'; 
+                        #,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            
+            },         
         
             ],
         },