You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/14 00:45:47 UTC

svn commit: r985392 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/

Author: thejas
Date: Fri Aug 13 22:45:46 2010
New Revision: 985392

URL: http://svn.apache.org/viewvc?rev=985392&view=rev
Log:
PIG-1448: Detach tuple from inner plans of physical operator (thejas)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 13 22:45:46 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1448: Detach tuple from inner plans of physical operator (thejas)
+
 PIG-965: PERFORMANCE: optimize common case in matches (PORegex) (ankit.modi
 via olgan)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Fri Aug 13 22:45:46 2010
@@ -84,6 +84,12 @@ public class PODemux extends PhysicalOpe
      * The leaf of the current pipeline
      */
     private PhysicalOperator curLeaf = null;
+
+    
+    /**
+     * The current pipeline plan
+     */
+    private PhysicalPlan curPlan = null;
     
     /*
      * Indicating if this operator is in a combiner. 
@@ -192,7 +198,8 @@ public class PODemux extends PhysicalOpe
         } else { 
         
             if (getNext) {
-                
+                if(curPlan != null)
+                    curPlan.detachInput();
                 Result inp = processInput();
                 
                 if (inp.returnStatus == POStatus.STATUS_EOP) {
@@ -238,6 +245,8 @@ public class PODemux extends PhysicalOpe
                        
             if (processedSet.cardinality() == myPlans.size()) {
                 curLeaf = null;
+                if(curPlan != null)
+                    curPlan.detachInput();
                 Result inp = processInput();
                 if (inp.returnStatus == POStatus.STATUS_OK) {                
                     attachInputWithIndex((Tuple)inp.result);
@@ -296,22 +305,21 @@ public class PODemux extends PhysicalOpe
         // is expected by the inner plans, as well as the index of the associated
         // inner plan.
         PigNullableWritable fld = (PigNullableWritable)res.get(0);        
-    
         // choose an inner plan to run based on the index set by
         // the POLocalRearrange operator and passed to this operator
         // by POMultiQueryPackage
         int index = fld.getIndex();
         index &= idxPart;                      
-        
-        PhysicalPlan pl = myPlans.get(index);
-        if (!(pl.getRoots().get(0) instanceof PODemux)) {                             
+
+        curPlan = myPlans.get(index);
+        if (!(curPlan.getRoots().get(0) instanceof PODemux)) {                             
             res.set(0, fld.getValueAsPigType());
         }
-    
-        myPlans.get(index).attachInput(res);
-        return myPlans.get(index).getLeaves().get(0);
+        
+        curPlan.attachInput(res);
+        return curPlan.getLeaves().get(0);
     }
-
+    
     /**
      * Sets a flag indicating if this operator is 
      * in a combiner. 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Aug 13 22:45:46 2010
@@ -259,7 +259,7 @@ public class POFRJoin extends PhysicalOp
             Tuple key = TupleFactory.getInstance().newTuple(1);
             key.set(0, lrOutTuple.get(1));
             Tuple value = getValueTuple(lr, lrOutTuple);
-
+            lr.detachInput();
             // Configure the for each operator with the relevant bags
             int i = -1;
             boolean noMatch = false;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Fri Aug 13 22:45:46 2010
@@ -146,6 +146,7 @@ public class POFilter extends PhysicalOp
             }
             */
             res = comOp.getNext(dummyBool);
+            plan.detachInput();
             if (res.returnStatus != POStatus.STATUS_OK 
                     && res.returnStatus != POStatus.STATUS_NULL) 
                 return res;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Aug 13 22:45:46 2010
@@ -118,8 +118,7 @@ public class POLocalRearrange extends Ph
     private int mProjectedColsMapSize = 0;
     private int mSecondaryProjectedColsMapSize = 0;
 
-    private Tuple lrOutput;
-    
+       
     private boolean useSecondaryKey = false;
     
     // By default, we strip keys from the value.
@@ -144,7 +143,6 @@ public class POLocalRearrange extends Ph
         secondaryLeafOps = new ArrayList<ExpressionOperator>();
         mProjectedColsMap = new HashMap<Integer, Integer>();
         mSecondaryProjectedColsMap = new HashMap<Integer, Integer>();
-        lrOutput = mTupleFactory.newTuple(3);
     }
 
     @Override
@@ -222,7 +220,6 @@ public class POLocalRearrange extends Ph
             // indices and hence would go to different invocation of reduce()
             this.index = multiQuery ? (byte)(index | PigNullableWritable.mqFlag) : (byte)index;
         }            
-        lrOutput.set(0, Byte.valueOf(this.index));
     }
     
     public boolean isDistinct() { 
@@ -378,11 +375,22 @@ public class POLocalRearrange extends Ph
             res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);            
             res.returnStatus = POStatus.STATUS_OK;
             
+            detachPlans(plans);
+
+            if(secondaryPlans != null)
+                detachPlans(secondaryPlans);
+            
             return res;
         }
         return inp;
     }
     
+    private void detachPlans(List<PhysicalPlan> plans) {
+        for (PhysicalPlan ep : plans) {
+            ep.detachInput();
+        }
+    }
+
     protected Object getKeyFromResult(List<Result> resLst, byte type) throws ExecException {
         Object key;
         if(resLst.size()>1){
@@ -412,6 +420,8 @@ public class POLocalRearrange extends Ph
     }
     
     protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
+        Tuple lrOutput = mTupleFactory.newTuple(3);
+        lrOutput.set(0, Byte.valueOf(this.index));
         //Construct key
         Object key;
         Object secondaryKey=null;
@@ -673,13 +683,6 @@ public class POLocalRearrange extends Ph
         clone.secondaryKeyType = secondaryKeyType;
         clone.useSecondaryKey = useSecondaryKey;
         clone.index = index;
-        try {
-            clone.lrOutput.set(0, index);
-        } catch (ExecException e) {
-            CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting index of output.");
-            cnse.initCause(e);
-            throw cnse;
-        }
         // Needs to be called as setDistinct so that the fake index tuple gets
         // created.
         clone.setDistinct(mIsDistinct);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Aug 13 22:45:46 2010
@@ -461,7 +461,7 @@ public class POMergeCogroup extends Phys
             String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
             throw new ExecException(errMsg,errCode,PigException.BUG);
         } 
-
+        lr.detachInput();
         return mTupleFactory.newTuple(((Tuple)lrOut.result).getAll());
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Aug 13 22:45:46 2010
@@ -419,6 +419,7 @@ public class POMergeJoin extends Physica
                 }
             } else {
                 Result res = rightPipelineLeaf.getNext(dummyTuple);
+                rightPipelineLeaf.detachInput();
                 switch(res.returnStatus){
                 case POStatus.STATUS_OK:
                     return res;
@@ -461,6 +462,7 @@ public class POMergeJoin extends Physica
         POLocalRearrange lr = LRs[lrIdx];
         lr.attachInput((Tuple)inp.result);
         Result lrOut = lr.getNext(dummyTuple);
+        lr.detachInput();
         if(lrOut.returnStatus!=POStatus.STATUS_OK){
             int errCode = 2167;
             String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Fri Aug 13 22:45:46 2010
@@ -240,6 +240,7 @@ public class POMultiQueryPackage extends
         pack.attachInput(curKey, tupIter);
 
         Result res = pack.getNext(t);
+        pack.detachInput();
         
         Tuple tuple = (Tuple)res.result;
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Fri Aug 13 22:45:46 2010
@@ -108,6 +108,8 @@ public class POOptimizedForEach extends 
                 }
                 if(res.returnStatus==POStatus.STATUS_EOP) {
                     processingPlan = false;
+                    for(PhysicalPlan plan : inputPlans)
+                        plan.detachInput();
                     return res;
                 }
                 if(res.returnStatus==POStatus.STATUS_ERR) {