You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [19/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebr...

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Tue Nov 24 19:54:19 2009
@@ -26,6 +26,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -51,35 +52,35 @@
 
 public class POUserFunc extends ExpressionOperator {
 
-	/**
+    /**
      * 
      */
     private static final long serialVersionUID = 1L;
     transient EvalFunc func;
-	
-	transient private final Log log = LogFactory.getLog(getClass());
-	FuncSpec funcSpec;
+    
+    transient private final Log log = LogFactory.getLog(getClass());
+    FuncSpec funcSpec;
     FuncSpec origFSpec;
-	public static final byte INITIAL = 0;
-	public static final byte INTERMEDIATE = 1;
-	public static final byte FINAL = 2;
-	private boolean initialized = false;
-
-	public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-		super(k, rp);
-		inputs = inp;
+    public static final byte INITIAL = 0;
+    public static final byte INTERMEDIATE = 1;
+    public static final byte FINAL = 2;
+    private boolean initialized = false;
 
-	}
+    public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp);
+        inputs = inp;
 
-	public POUserFunc(
+    }
+
+    public POUserFunc(
             OperatorKey k,
             int rp,
             List<PhysicalOperator> inp,
             FuncSpec funcSpec) {
-		this(k, rp, inp, funcSpec, null);
-	}
-	
-	public POUserFunc(
+        this(k, rp, inp, funcSpec, null);
+    }
+    
+    public POUserFunc(
             OperatorKey k,
             int rp,
             List<PhysicalOperator> inp,
@@ -87,60 +88,60 @@
             EvalFunc func) {
         super(k, rp);
         super.setInputs(inp);
-		this.funcSpec = funcSpec;
+        this.funcSpec = funcSpec;
         this.origFSpec = funcSpec;
-		this.func = func;
+        this.func = func;
         instantiateFunc(funcSpec);
-	}
+    }
 
-	private void instantiateFunc(FuncSpec fSpec) {
-		this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
-		//the next couple of initializations do not work as intended for the following reasons
-		//the reporter and pigLogger are member variables of PhysicalOperator
-		//when instanitateFunc is invoked at deserialization time, both
-		//reporter and pigLogger are null. They are set during map and reduce calls,
-		//making the initializations here basically useless. Look at the processInput
-		//method where these variables are re-initialized. At that point, the PhysicalOperator
-		//is set up correctly with the reporter and pigLogger references
+    private void instantiateFunc(FuncSpec fSpec) {
+        this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+        //the next couple of initializations do not work as intended for the following reasons
+        //the reporter and pigLogger are member variables of PhysicalOperator
+        //when instanitateFunc is invoked at deserialization time, both
+        //reporter and pigLogger are null. They are set during map and reduce calls,
+        //making the initializations here basically useless. Look at the processInput
+        //method where these variables are re-initialized. At that point, the PhysicalOperator
+        //is set up correctly with the reporter and pigLogger references
         this.func.setReporter(reporter);
         this.func.setPigLogger(pigLogger);
-	}
-	
-	public Result processInput() throws ExecException {
+    }
+    
+    public Result processInput() throws ExecException {
 
         // Make sure the reporter is set, because it isn't getting carried
         // across in the serialization (don't know why).  I suspect it's as
         // cheap to call the setReporter call everytime as to check whether I
         // have (hopefully java will inline it).
         if(!initialized) {
-        	func.setReporter(reporter);
-        	func.setPigLogger(pigLogger);
-        	initialized = true;
+            func.setReporter(reporter);
+            func.setPigLogger(pigLogger);
+            initialized = true;
         }
 
-		Result res = new Result();
-		Tuple inpValue = null;
-		if (input == null && (inputs == null || inputs.size()==0)) {
+        Result res = new Result();
+        Tuple inpValue = null;
+        if (input == null && (inputs == null || inputs.size()==0)) {
 //			log.warn("No inputs found. Signaling End of Processing.");
-			res.returnStatus = POStatus.STATUS_EOP;
-			return res;
-		}
-
-		//Should be removed once the model is clear
-		if(reporter!=null) reporter.progress();
-
-		
-		if(isInputAttached()) {
-			res.result = input;
-			res.returnStatus = POStatus.STATUS_OK;
-			detachInput();
-			return res;
-		} else {
-			res.result = TupleFactory.getInstance().newTuple();
-			
-			Result temp = null;
-			for(PhysicalOperator op : inputs) {
-				switch(op.getResultType()){
+            res.returnStatus = POStatus.STATUS_EOP;
+            return res;
+        }
+
+        //Should be removed once the model is clear
+        if(reporter!=null) reporter.progress();
+
+        
+        if(isInputAttached()) {
+            res.result = input;
+            res.returnStatus = POStatus.STATUS_OK;
+            detachInput();
+            return res;
+        } else {
+            res.result = TupleFactory.getInstance().newTuple();
+            
+            Result temp = null;
+            for(PhysicalOperator op : inputs) {
+                switch(op.getResultType()){
                 case DataType.BAG:
                     temp = op.getNext(dummyBag);
                     break;
@@ -187,210 +188,222 @@
                     }
                 }
                 ((Tuple)res.result).append(temp.result);
-			}
-			res.returnStatus = temp.returnStatus;
-			return res;
-		}
-	}
+            }
+            res.returnStatus = temp.returnStatus;
+            return res;
+        }
+    }
 
-	private Result getNext() throws ExecException {
-        Result result = processInput();
+    private Result getNext() throws ExecException {
+        Result result = processInput();        
         String errMsg = "";
-		try {
-			if(result.returnStatus == POStatus.STATUS_OK) {
-				result.result = func.exec((Tuple) result.result);
+        try {						
+            if(result.returnStatus == POStatus.STATUS_OK) {
+                if (isAccumulative()) {
+                    if (isAccumStarted()) {							
+                        ((Accumulator)func).accumulate((Tuple)result.result);
+                        result.returnStatus = POStatus.STATUS_BATCH_OK;
+                        result.result = null;
+                    }else{												
+                        result.result = ((Accumulator)func).getValue();	
+                        ((Accumulator)func).cleanup();
+                    }
+                } else {					
+                    result.result = func.exec((Tuple) result.result);
+                }
                 if(resultType == DataType.BYTEARRAY) {
                     if(res.result != null && DataType.findType(result.result) != DataType.BYTEARRAY) {
                         result.result = new DataByteArray(result.result.toString().getBytes());
                     }
                 }
-				return result;
-			}
-			return result;
-		} catch (ExecException ee) {
-		    throw ee;
-		} catch (IOException ioe) {
-		    int errCode = 2078;
-		    String msg = "Caught error from UDF: " + funcSpec.getClassName(); 
+                return result;
+            }
+                        
+            return result;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (IOException ioe) {
+            int errCode = 2078;
+            String msg = "Caught error from UDF: " + funcSpec.getClassName(); 
             String footer = " [" + ioe.getMessage() + "]";
-		    
-		    if(ioe instanceof PigException) {
-		        int udfErrorCode = ((PigException)ioe).getErrorCode();
-		        if(udfErrorCode != 0) {
-		            errCode = udfErrorCode;
-		            msg = ((PigException)ioe).getMessage();
-		        } else {
-		            msg += " [" + ((PigException)ioe).getMessage() + " ]";
-		        }
-		    } else {
-		        msg += footer;
-		    }
-		    
-			throw new ExecException(msg, errCode, PigException.BUG, ioe);
-		} catch (IndexOutOfBoundsException ie) {
+            
+            if(ioe instanceof PigException) {
+                int udfErrorCode = ((PigException)ioe).getErrorCode();
+                if(udfErrorCode != 0) {
+                    errCode = udfErrorCode;
+                    msg = ((PigException)ioe).getMessage();
+                } else {
+                    msg += " [" + ((PigException)ioe).getMessage() + " ]";
+                }
+            } else {
+                msg += footer;
+            }
+            
+            throw new ExecException(msg, errCode, PigException.BUG, ioe);
+        } catch (IndexOutOfBoundsException ie) {
             int errCode = 2078;
             String msg = "Caught error from UDF: " + funcSpec.getClassName() + 
             ", Out of bounds access [" + ie.getMessage() + "]";
             throw new ExecException(msg, errCode, PigException.BUG, ie);
-    	}
-	}
+        }
+    }
 
-	@Override
-	public Result getNext(Tuple tIn) throws ExecException {
-		return getNext();
-	}
+    @Override
+    public Result getNext(Tuple tIn) throws ExecException {
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(DataBag db) throws ExecException {
-		return getNext();
-	}
+    @Override
+    public Result getNext(DataBag db) throws ExecException {
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Integer i) throws ExecException {
-		return getNext();
-	}
+    @Override
+    public Result getNext(Integer i) throws ExecException {
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Boolean b) throws ExecException {
+    @Override
+    public Result getNext(Boolean b) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(DataByteArray ba) throws ExecException {
+    @Override
+    public Result getNext(DataByteArray ba) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Double d) throws ExecException {
+    @Override
+    public Result getNext(Double d) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Float f) throws ExecException {
+    @Override
+    public Result getNext(Float f) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Long l) throws ExecException {
+    @Override
+    public Result getNext(Long l) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(Map m) throws ExecException {
+    @Override
+    public Result getNext(Map m) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	@Override
-	public Result getNext(String s) throws ExecException {
+    @Override
+    public Result getNext(String s) throws ExecException {
 
-		return getNext();
-	}
+        return getNext();
+    }
 
-	public void setAlgebraicFunction(byte Function) throws ExecException {
-		// This will only be used by the optimizer for putting correct functions
-		// in the mapper,
-		// combiner and reduce. This helps in maintaining the physical plan as
-		// is without the
-		// optimiser having to replace any operators.
-		// You wouldn't be able to make two calls to this function on the same
-		// algebraic EvalFunc as
-		// func is being changed.
-		switch (Function) {
-		case INITIAL:
+    public void setAlgebraicFunction(byte Function) throws ExecException {
+        // This will only be used by the optimizer for putting correct functions
+        // in the mapper,
+        // combiner and reduce. This helps in maintaining the physical plan as
+        // is without the
+        // optimiser having to replace any operators.
+        // You wouldn't be able to make two calls to this function on the same
+        // algebraic EvalFunc as
+        // func is being changed.
+        switch (Function) {
+        case INITIAL:
             funcSpec = new FuncSpec(getInitial());
-			break;
-		case INTERMEDIATE:
+            break;
+        case INTERMEDIATE:
             funcSpec = new FuncSpec(getIntermed());
-			break;
-		case FINAL:
+            break;
+        case FINAL:
             funcSpec = new FuncSpec(getFinal());
-			break;
+            break;
 
-		}
+        }
         instantiateFunc(funcSpec);
         setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
-	}
+    }
 
-	public String getInitial() throws ExecException {
-	    instantiateFunc(origFSpec);
-		if (func instanceof Algebraic) {
-			return ((Algebraic) func).getInitial();
-		} else {
-		    int errCode = 2072;
-			String msg = "Attempt to run a non-algebraic function"
+    public String getInitial() throws ExecException {
+        instantiateFunc(origFSpec);
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getInitial();
+        } else {
+            int errCode = 2072;
+            String msg = "Attempt to run a non-algebraic function"
                 + " as an algebraic function";
             throw new ExecException(msg, errCode, PigException.BUG);
-		}
-	}
+        }
+    }
 
-	public String getIntermed() throws ExecException {
+    public String getIntermed() throws ExecException {
         instantiateFunc(origFSpec);
-		if (func instanceof Algebraic) {
-			return ((Algebraic) func).getIntermed();
-		} else {
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getIntermed();
+        } else {
             int errCode = 2072;
             String msg = "Attempt to run a non-algebraic function"
                 + " as an algebraic function";
             throw new ExecException(msg, errCode, PigException.BUG);
-		}
-	}
+        }
+    }
 
-	public String getFinal() throws ExecException {
+    public String getFinal() throws ExecException {
         instantiateFunc(origFSpec);
-		if (func instanceof Algebraic) {
-			return ((Algebraic) func).getFinal();
-		} else {
+        if (func instanceof Algebraic) {
+            return ((Algebraic) func).getFinal();
+        } else {
             int errCode = 2072;
             String msg = "Attempt to run a non-algebraic function"
                 + " as an algebraic function";
             throw new ExecException(msg, errCode, PigException.BUG);
-		}
-	}
+        }
+    }
 
-	public Type getReturnType() {
-		return func.getReturnType();
-	}
+    public Type getReturnType() {
+        return func.getReturnType();
+    }
 
-	public void finish() {
-		func.finish();
-	}
+    public void finish() {
+        func.finish();
+    }
 
-	public Schema outputSchema(Schema input) {
-		return func.outputSchema(input);
-	}
+    public Schema outputSchema(Schema input) {
+        return func.outputSchema(input);
+    }
 
-	public Boolean isAsynchronous() {
-		return func.isAsynchronous();
-	}
+    public Boolean isAsynchronous() {
+        return func.isAsynchronous();
+    }
 
-	@Override
-	public String name() {
-	    return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
-	}
+    @Override
+    public String name() {
+        return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+    }
 
-	@Override
-	public boolean supportsMultipleInputs() {
+    @Override
+    public boolean supportsMultipleInputs() {
 
-		return true;
-	}
+        return true;
+    }
 
-	@Override
-	public boolean supportsMultipleOutputs() {
+    @Override
+    public boolean supportsMultipleOutputs() {
 
-		return false;
-	}
+        return false;
+    }
 
-	@Override
-	public void visit(PhyPlanVisitor v) throws VisitorException {
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
 
-		v.visitUserFunc(this);
-	}
+        v.visitUserFunc(this);
+    }
 
     public FuncSpec getFuncSpec() {
         return funcSpec;
@@ -414,4 +427,12 @@
         is.defaultReadObject();
         instantiateFunc(funcSpec);
     }
+
+    /**
+     * Get child expression of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {		
+        return null;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Subtract.java Tue Nov 24 19:54:19 2009
@@ -53,6 +53,11 @@
 
     @Override
     public Result getNext(Double d) throws ExecException {
+        Result r = accumChild(null, d);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Double left = null, right = null;
@@ -76,6 +81,11 @@
     
     @Override
     public Result getNext(Float f) throws ExecException {
+        Result r = accumChild(null, f);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Float left = null, right = null;
@@ -99,6 +109,11 @@
     
     @Override
     public Result getNext(Integer i) throws ExecException {
+        Result r = accumChild(null, i);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Integer left = null, right = null;
@@ -122,6 +137,11 @@
     
     @Override
     public Result getNext(Long l) throws ExecException {
+        Result r = accumChild(null, l);
+        if (r != null) {
+            return r;
+        }
+        
         byte status;
         Result res;
         Long left = null, right = null;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryExpressionOperator.java Tue Nov 24 19:54:19 2009
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -26,6 +29,7 @@
 public abstract class UnaryExpressionOperator extends ExpressionOperator {
 
     ExpressionOperator expr;
+    private transient List<ExpressionOperator> child;
     
     public UnaryExpressionOperator(OperatorKey k, int rp) {
         super(k, rp);
@@ -73,4 +77,17 @@
         resultType = op.getResultType();
     }
 
+    /**
+     * Get child expression of this expression
+     */
+    @Override
+    public List<ExpressionOperator> getChildExpressions() {
+        if (child == null) {
+            child = new ArrayList<ExpressionOperator>();		
+            child.add(expr);		
+        }
+        
+        return child;		
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Tue Nov 24 19:54:19 2009
@@ -204,12 +204,6 @@
     }
 
     @Override
-    public boolean equals(Object obj) {
-        // TODO Auto-generated method stub
-        return super.equals(obj);
-    }
-
-    @Override
     public PhysicalPlan clone() throws CloneNotSupportedException {
         PhysicalPlan clone = new PhysicalPlan();
 

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/AccumulativeTupleBuffer.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Tuple;
+
+/**
+ * This interface is used during Reduce phrase to process tuples
+ * in batch mode. It is used by POPackage when all of the UDFs can be
+ * called in accumulative mode. Tuples are not pulled all at once,
+ * instead, each time, only a specified number of tuples are pulled out
+ * of iterator and put in an buffer. Then this buffer is wrapped into 
+ * a bag to be passed to the operators in reduce plan.
+ * 
+ * The purpose of doing this is to reduce memory usage and avoid spilling.
+ */
+public interface AccumulativeTupleBuffer {
+    
+    /**
+     * Pull next batch of tuples from iterator and put them into this buffer
+     */
+    public void nextBatch() throws IOException;
+    
+    /**
+     * Whether there are more tuples to pull out of iterator
+     */
+    public boolean hasNextBatch() ;
+    
+    /**
+     * Clear internal buffer, this should be called after all data are retreived
+     */
+    public void clear();
+    
+    /**
+     * Get iterator of tuples in the buffer
+     * @param index  the index of tuple
+     */
+    public Iterator<Tuple> getTuples(int index);
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Tue Nov 24 19:54:19 2009
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -61,14 +62,7 @@
     private static Result eop = new Result(POStatus.STATUS_EOP, null);
     
     transient private Log log = LogFactory.getLog(getClass());
-    
-    /*
-     * The base index of this demux. In the case of
-     * a demux contained in another demux, the index
-     * passed in must be shifted before it can be used.
-     */
-    private int baseIndex = 0;
-    
+        
     /*
      * The list of sub-plans the inner plan is composed of
      */
@@ -178,7 +172,7 @@
 
     @Override
     public String name() {
-        return "Demux" + isKeyWrapped + "[" + baseIndex +"] - " + mKey.toString();
+        return "Demux" + isKeyWrapped + " - " + mKey.toString();
     }
 
     @Override
@@ -190,32 +184,35 @@
     public boolean supportsMultipleOutputs() {
         return false;
     }
-
+    
     /**
-     * Sets the base index of this demux. 
-     * 
-     * @param idx the base index
+     * Returns the list of inner plans.
+     *  
+     * @return the list of the nested plans
      */
-    public void setBaseIndex(int idx) {
-        baseIndex = idx;
+    public List<PhysicalPlan> getPlans() {
+        return myPlans;
     }
     
     /**
-     * Returns the base index of this demux
+     * Returns the list of booleans that indicates if the 
+     * key needs to unwrapped for the corresponding plan.
      * 
-     * @return the base index
+     * @return the list of isKeyWrapped boolean values
      */
-    public int getBaseIndex() {
-        return baseIndex;
+    public List<Boolean> getIsKeyWrappedList() {
+        return Collections.unmodifiableList(isKeyWrapped);
     }
     
     /**
-     * Returns the list of inner plans.
-     *  
-     * @return the list of the nested plans
+     * Adds a list of IsKeyWrapped boolean values
+     * 
+     * @param lst the list of boolean values to add
      */
-    public List<PhysicalPlan> getPlans() {
-        return myPlans;
+    public void addIsKeyWrappedList(List<Boolean> lst) {
+        for (Boolean b : lst) {
+            isKeyWrapped.add(b);
+        }
     }
     
     /**
@@ -232,6 +229,12 @@
         isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
         keyPositions.add(keyPos);
     }
+    
+    public void addPlan(PhysicalPlan inPlan, boolean[] keyPos) {  
+        myPlans.add(inPlan);
+        processedSet.set(myPlans.size()-1);
+        keyPositions.add(keyPos);
+    }
    
     @Override
     public Result getNext(Tuple t) throws ExecException {
@@ -357,8 +360,7 @@
         // the POLocalRearrange operator and passed to this operator
         // by POMultiQueryPackage
         int index = fld.getIndex();
-        index &= idxPart;
-        index -= baseIndex;                         
+        index &= idxPart;                      
         
         PhysicalPlan pl = myPlans.get(index);
         if (!(pl.getRoots().get(0) instanceof PODemux)) {                             

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Tue Nov 24 19:54:19 2009
@@ -24,7 +24,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ExecType;
@@ -39,6 +38,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -49,19 +49,18 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
-
 /**
- * The operator models the join keys using the Local Rearrange operators which 
- * are configured with the plan specified by the user. It also sets up
- * one Hashtable per replicated input which maps the Key(k) stored as a Tuple
- * to a DataBag which holds all the values in the input having the same key(k)
- * The getNext() reads an input from its predecessor and separates them into
- * key & value. It configures a foreach operator with the databags obtained from
- * each Hashtable for the key and also with the value for the fragment input.
- * It then returns tuples returned by this foreach operator.
+ * The operator models the join keys using the Local Rearrange operators which
+ * are configured with the plan specified by the user. It also sets up one
+ * Hashtable per replicated input which maps the Key(k) stored as a Tuple to a
+ * DataBag which holds all the values in the input having the same key(k) The
+ * getNext() reads an input from its predecessor and separates them into key &
+ * value. It configures a foreach operator with the databags obtained from each
+ * Hashtable for the key and also with the value for the fragment input. It then
+ * returns tuples returned by this foreach operator.
  */
 
-//We intentionally skip type checking in backend for performance reasons
+// We intentionally skip type checking in backend for performance reasons
 @SuppressWarnings("unchecked")
 public class POFRJoin extends PhysicalOperator {
     /**
@@ -69,35 +68,48 @@
      */
     private static final long serialVersionUID = 1L;
     static private Log log = LogFactory.getLog(POFRJoin.class);
-    //The number in the input list which denotes the fragmented input
+    // The number in the input list which denotes the fragmented input
     private int fragment;
-    //There can be n inputs each being a List<PhysicalPlan>
-    //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
+    // There can be n inputs each being a List<PhysicalPlan>
+    // Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
     private List<List<PhysicalPlan>> phyPlanLists;
-    //The key type for each Local Rearrange operator
+    // The key type for each Local Rearrange operator
     private List<List<Byte>> keyTypes;
-    //The Local Rearrange operators modeling the join key
+    // The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
-    //The set of files that represent the replicated inputs
+    // The set of files that represent the replicated inputs
     private FileSpec[] replFiles;
-    //Used to configure the foreach operator
+    // Used to configure the foreach operator
     private ConstantExpression[] constExps;
-    //Used to produce the cross product of various bags
+    // Used to produce the cross product of various bags
     private POForEach fe;
-    //The array of Hashtables one per replicated input. replicates[fragment] = null
-    private Map<Tuple,List<Tuple>> replicates[];
-    //varaible which denotes whether we are returning tuples from the foreach operator
+    // The array of Hashtables one per replicated input. replicates[fragment] =
+    // null
+    // fragment is the input which is fragmented and not replicated.
+    private Map<Tuple, List<Tuple>> replicates[];
+    // varaible which denotes whether we are returning tuples from the foreach
+    // operator
     private boolean processingPlan;
-    //A dummy tuple
+    // A dummy tuple
     private Tuple dumTup = TupleFactory.getInstance().newTuple(1);
-    //An instance of tuple factory
+    // An instance of tuple factory
     private transient TupleFactory mTupleFactory;
     private transient BagFactory mBagFactory;
     private boolean setUp;
-    
-    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] replFiles, int fragment) throws ExecException{
-        super(k,rp,inp);
-        
+    // A Boolean variable which denotes if this is a LeftOuter Join or an Inner
+    // Join
+    private boolean isLeftOuterJoin;
+
+    // This list contains nullTuples according to schema of various inputs 
+    private DataBag nullBag;
+
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
+            List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
+            FileSpec[] replFiles, int fragment, boolean isLeftOuter,
+            Tuple nullTuple)
+            throws ExecException {
+        super(k, rp, inp);
+
         phyPlanLists = ppLists;
         this.fragment = fragment;
         this.keyTypes = keyTypes;
@@ -109,32 +121,39 @@
         processingPlan = false;
         mTupleFactory = TupleFactory.getInstance();
         mBagFactory = BagFactory.getInstance();
+        List<Tuple> tupList = new ArrayList<Tuple>();
+        tupList.add(nullTuple);
+        nullBag = mBagFactory.newDefaultBag(tupList);
+        this.isLeftOuterJoin = isLeftOuter;
     }
-    
-    public List<List<PhysicalPlan>> getJoinPlans(){
+
+    public List<List<PhysicalPlan>> getJoinPlans() {
         return phyPlanLists;
     }
-    
-    private OperatorKey genKey(OperatorKey old){
-        return new OperatorKey(old.scope,NodeIdGenerator.getGenerator().getNextNodeId(old.scope));
+
+    private OperatorKey genKey(OperatorKey old) {
+        return new OperatorKey(old.scope, NodeIdGenerator.getGenerator()
+                .getNextNodeId(old.scope));
     }
-    
+
     /**
      * Configures the Local Rearrange operators & the foreach operator
+     * 
      * @param old
-     * @throws ExecException 
+     * @throws ExecException
      */
-    private void createJoinPlans(OperatorKey old) throws ExecException{
+    private void createJoinPlans(OperatorKey old) throws ExecException {
         List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
         List<Boolean> flatList = new ArrayList<Boolean>();
-        
-        int i=-1;
+
+        int i = -1;
         for (List<PhysicalPlan> ppLst : phyPlanLists) {
             ++i;
             POLocalRearrange lr = new POLocalRearrange(genKey(old));
             lr.setIndex(i);
             lr.setResultType(DataType.TUPLE);
-            lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE : keyTypes.get(i).get(0));
+            lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE
+                    : keyTypes.get(i).get(0));
             try {
                 lr.setPlans(ppLst);
             } catch (PlanException pe) {
@@ -142,18 +161,22 @@
                 String msg = "Problem with setting up local rearrange's plans.";
                 throw new ExecException(msg, errCode, PigException.BUG, pe);
             }
-            LRs[i]= lr;
+            LRs[i] = lr;
             ConstantExpression ce = new ConstantExpression(genKey(old));
-            ce.setResultType((i==fragment)?DataType.TUPLE:DataType.BAG);
+            ce.setResultType((i == fragment) ? DataType.TUPLE : DataType.BAG);
             constExps[i] = ce;
             PhysicalPlan pp = new PhysicalPlan();
             pp.add(ce);
             fePlans.add(pp);
             flatList.add(true);
         }
-        fe = new POForEach(genKey(old),-1,fePlans,flatList);
+        // The ForEach operator here is used for generating a Cross-Product
+        // It is given a set of constant expressions with
+        // Tuple,(Bag|Tuple),(...)
+        // It does a cross product on that and produces output.
+        fe = new POForEach(genKey(old), -1, fePlans, flatList);
     }
-    
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitFRJoin(this);
@@ -161,18 +184,17 @@
 
     @Override
     public String name() {
-        return "FRJoin[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+        return "FRJoin[" + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
     }
 
     @Override
     public boolean supportsMultipleInputs() {
-        // TODO Auto-generated method stub
         return true;
     }
 
     @Override
     public boolean supportsMultipleOutputs() {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -180,34 +202,36 @@
     public Result getNext(Tuple t) throws ExecException {
         Result res = null;
         Result inp = null;
-        if(!setUp){
+        if (!setUp) {
             setUpHashMap();
             setUp = true;
         }
-        if(processingPlan){
-            //Return tuples from the for each operator
-            //Assumes that it is configured appropriately with
-            //the bags for the current key.
-            while(true) {
+        if (processingPlan) {
+            // Return tuples from the for each operator
+            // Assumes that it is configured appropriately with
+            // the bags for the current key.
+            while (true) {
                 res = fe.getNext(dummyTuple);
-                
-                if(res.returnStatus==POStatus.STATUS_OK){
+
+                if (res.returnStatus == POStatus.STATUS_OK) {
                     return res;
                 }
-                if(res.returnStatus==POStatus.STATUS_EOP){
-                    processingPlan = false;
+                if (res.returnStatus == POStatus.STATUS_EOP) {
+                    // We have completed all cross-products now its time to move
+                    // to next tuple of left side
+                    processingPlan = false;                    
                     break;
                 }
-                if(res.returnStatus==POStatus.STATUS_ERR) {
+                if (res.returnStatus == POStatus.STATUS_ERR) {
                     return res;
                 }
-                if(res.returnStatus==POStatus.STATUS_NULL) {
+                if (res.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 }
             }
         }
         while (true) {
-            //Process the current input
+            // Process the current input
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP
                     || inp.returnStatus == POStatus.STATUS_ERR)
@@ -215,99 +239,128 @@
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
-            
-            //Separate Key & Value using the fragment's LR operator
+
+            // Separate Key & Value using the fragment's LR operator
             POLocalRearrange lr = LRs[fragment];
-            lr.attachInput((Tuple)inp.result);
+            lr.attachInput((Tuple) inp.result);
             Result lrOut = lr.getNext(dummyTuple);
-            if(lrOut.returnStatus!=POStatus.STATUS_OK) {
-                log.error("LocalRearrange isn't configured right or is not working");
+            if (lrOut.returnStatus != POStatus.STATUS_OK) {
+                log
+                        .error("LocalRearrange isn't configured right or is not working");
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
             Tuple key = TupleFactory.getInstance().newTuple(1);
-            key.set(0,lrOutTuple.get(1));
+            key.set(0, lrOutTuple.get(1));
             Tuple value = getValueTuple(lr, lrOutTuple);
-            
-            //Configure the for each operator with the relevant bags
-            int i=-1;
+
+            // Configure the for each operator with the relevant bags
+            int i = -1;
             boolean noMatch = false;
             for (ConstantExpression ce : constExps) {
                 ++i;
-                if(i==fragment){
+                if (i == fragment) {
+                    // We set the first CE as the tuple from fragmented Left
                     ce.setValue(value);
                     continue;
                 }
                 Map<Tuple, List<Tuple>> replicate = replicates[i];
-                if(!replicate.containsKey(key)){
+                if (!replicate.containsKey(key)) {
+                    if (isLeftOuterJoin) {
+                        ce.setValue(nullBag);
+                    }
                     noMatch = true;
                     break;
                 }
                 ce.setValue(mBagFactory.newDefaultBag(replicate.get(key)));
             }
-            if(noMatch)
+
+            // If this is not LeftOuter Join and there was no match we
+            // skip the processing of this left tuple and move ahead
+            if (!isLeftOuterJoin && noMatch)
                 continue;
             fe.attachInput(dumTup);
             processingPlan = true;
-            
+
+            // We are all set, we call getNext (this function) which will call
+            // getNext on ForEach
+            // And that will return one tuple of Cross-Product between set
+            // constant Expressions
+            // All subsequent calls ( by parent ) to this function will return
+            // next tuple of crossproduct
             Result gn = getNext(dummyTuple);
+
             return gn;
         }
     }
 
     /**
-     * Builds the HashMaps by reading each replicated input from the DFS
-     * using a Load operator
+     * Builds the HashMaps by reading each replicated input from the DFS using a
+     * Load operator
+     * 
      * @throws ExecException
      */
     private void setUpHashMap() throws ExecException {
-        int i=-1;
+        int i = -1;
         long time1 = System.currentTimeMillis();
         for (FileSpec replFile : replFiles) {
             ++i;
-            if(i==fragment){
+
+            if (i == fragment) {
                 replicates[i] = null;
                 continue;
             }
 
-            POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false);
-            PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+            POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
+                    replFile, false);
+            PigContext pc = new PigContext(ExecType.MAPREDUCE,
+                    ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
             pc.connect();
             ld.setPc(pc);
+            // We use LocalRearrange Operator to seperate Key and Values
+            // eg. ( a, b, c ) would generate a, ( a, b, c )
+            // And we use 'a' as the key to the HashMap
+            // The rest '( a, b, c )' is added to HashMap as value
+            // We could have manually done this, but LocalRearrange does the
+            // same thing, so utilizing its functionality
             POLocalRearrange lr = LRs[i];
-            lr.setInputs(Arrays.asList((PhysicalOperator)ld));
-            Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(1000);
+            lr.setInputs(Arrays.asList((PhysicalOperator) ld));
+            Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(
+                    1000);
             log.debug("Completed setup. Trying to build replication hash table");
             int cnt = 0;
-            for(Result res=lr.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(dummyTuple)){
+            for (Result res = lr.getNext(dummyTuple);res.returnStatus != POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
                 ++cnt;
-                if(reporter!=null) reporter.progress();
+                if (reporter != null)
+                    reporter.progress();               
                 Tuple tuple = (Tuple) res.result;
                 Tuple key = mTupleFactory.newTuple(1);
-                key.set(0,tuple.get(1));
+                key.set(0, tuple.get(1));
                 Tuple value = getValueTuple(lr, tuple);
-                if(!replicate.containsKey(key))
+                if (!replicate.containsKey(key))
                     replicate.put(key, new ArrayList<Tuple>());
                 replicate.get(key).add(value);
             }
             replicates[i] = replicate;
 
         }
-	long time2 = System.currentTimeMillis();
-        log.debug("Hash Table built. Time taken: " + (time2-time1));
+        long time2 = System.currentTimeMillis();
+        log.debug("Hash Table built. Time taken: " + (time2 - time1));
     }
-    
-    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
+
+    private void readObject(ObjectInputStream is) throws IOException,
+            ClassNotFoundException, ExecException {
         is.defaultReadObject();
         mTupleFactory = TupleFactory.getInstance();
         mBagFactory = BagFactory.getInstance();
-//        setUpHashTable();
+        // setUpHashTable();
     }
-    
+
     /*
      * Extracts the value tuple from the LR operator's output tuple
      */
-    private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple) throws ExecException {
+    private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple)
+            throws ExecException {
         Tuple val = (Tuple) tuple.get(2);
         Tuple retTup = null;
         boolean isProjectStar = lr.isProjectStar();
@@ -315,18 +368,18 @@
         int keyLookupSize = keyLookup.size();
         Object key = tuple.get(1);
         boolean isKeyTuple = lr.isKeyTuple();
-        Tuple keyAsTuple = isKeyTuple ? (Tuple)tuple.get(1) : null;
-        if( keyLookupSize > 0) {
-            
+        Tuple keyAsTuple = isKeyTuple ? (Tuple) tuple.get(1) : null;
+        if (keyLookupSize > 0) {
+
             // we have some fields of the "value" in the
             // "key".
             retTup = mTupleFactory.newTuple();
             int finalValueSize = keyLookupSize + val.size();
-            int valIndex = 0; // an index for accessing elements from 
-                              // the value (val) that we have currently
-            for(int i = 0; i < finalValueSize; i++) {
+            int valIndex = 0; // an index for accessing elements from
+            // the value (val) that we have currently
+            for (int i = 0; i < finalValueSize; i++) {
                 Integer keyIndex = keyLookup.get(i);
-                if(keyIndex == null) {
+                if (keyIndex == null) {
                     // the field for this index is not in the
                     // key - so just take it from the "value"
                     // we were handed
@@ -334,7 +387,7 @@
                     valIndex++;
                 } else {
                     // the field for this index is in the key
-                    if(isKeyTuple) {
+                    if (isKeyTuple) {
                         // the key is a tuple, extract the
                         // field out of the tuple
                         retTup.append(keyAsTuple.get(keyIndex));
@@ -343,19 +396,19 @@
                     }
                 }
             }
-            
+
         } else if (isProjectStar) {
-            
+
             // the whole "value" is present in the "key"
             retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
-            
+
         } else {
-            
+
             // there is no field of the "value" in the
             // "key" - so just make a copy of what we got
             // as the "value"
             retTup = mTupleFactory.newTuple(val.getAll());
-            
+
         }
         return retTup;
     }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Tue Nov 24 19:54:19 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -37,6 +38,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -59,7 +61,7 @@
     protected List<PhysicalPlan> inputPlans;
     protected List<PhysicalOperator> opsToBeReset;
     transient protected Log log = LogFactory.getLog(getClass());
-    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
     //Since the plan has a generate, this needs to be maintained
     //as the generate can potentially return multiple tuples for
     //same call.
@@ -89,6 +91,8 @@
 
     protected PhysicalOperator[] planLeafOps = null;
     
+    protected transient AccumulativeTupleBuffer buffer;
+    
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -146,15 +150,54 @@
     @Override
     public boolean supportsMultipleOutputs() {
         return false;
+    }      
+          
+    public void setAccumulative() {
+        super.setAccumulative();
+        for(PhysicalPlan p : inputPlans) {            
+            Iterator<PhysicalOperator> iter = p.iterator();
+            while(iter.hasNext()) {
+                PhysicalOperator po = iter.next();
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+                    po.setAccumulative();
+                }
+            }
+        }
+    }
+
+    public void setAccumStart() {
+        super.setAccumStart();
+        for(PhysicalPlan p : inputPlans) {            
+            Iterator<PhysicalOperator> iter = p.iterator();
+            while(iter.hasNext()) {
+                PhysicalOperator po = iter.next();
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {
+                    po.setAccumStart();
+                }
+            }
+        }
     }
     
+    public void setAccumEnd() {    	
+        super.setAccumEnd();
+        for(PhysicalPlan p : inputPlans) {            
+            Iterator<PhysicalOperator> iter = p.iterator();
+            while(iter.hasNext()) {
+                PhysicalOperator po = iter.next();
+                if (po instanceof ExpressionOperator || po instanceof PODistinct) {    				
+                    po.setAccumEnd();
+                }
+            }
+        }
+    }
+
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan and returns it maintaining an additional state
      * to denote the begin and end of the nested plan processing.
      */
     @Override
-    public Result getNext(Tuple t) throws ExecException {
+    public Result getNext(Tuple t) throws ExecException {    	
         Result res = null;
         Result inp = null;
         //The nested plan is under processing
@@ -162,14 +205,15 @@
         //returns
         if(processingPlan){
             while(true) {
-                res = processPlan();
+                res = processPlan();               
+                
                 if(res.returnStatus==POStatus.STATUS_OK) {
                     if(lineageTracer !=  null && res.result != null) {
-                	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-                	tOut.synthetic = tIn.synthetic;
-                	lineageTracer.insert(tOut);
-                	lineageTracer.union(tOut, tIn);
-                	res.result = tOut;
+                    ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+                    tOut.synthetic = tIn.synthetic;
+                    lineageTracer.insert(tOut);
+                    lineageTracer.union(tOut, tIn);
+                    res.result = tOut;
                     }
                     return res;
                 }
@@ -198,32 +242,71 @@
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
-            
+                       
             attachInputToPlans((Tuple) inp.result);
+            Tuple tuple = (Tuple)inp.result;
+            
             for (PhysicalOperator po : opsToBeReset) {
                 po.reset();
             }
-            res = processPlan();
+            
+            if (isAccumulative()) {            	
+                for(int i=0; i<tuple.size(); i++) {            		
+                    if (tuple.getType(i) == DataType.BAG) {
+                        // we only need to check one bag, because all the bags
+                        // share the same buffer
+                        buffer = ((AccumulativeBag)tuple.get(i)).getTuplebuffer();
+                        break;
+                    }
+                }
+                
+                while(true) {                    		
+                    if (buffer.hasNextBatch()) {        
+                        try {
+                            buffer.nextBatch();
+                        }catch(IOException e) {
+                            throw new ExecException(e);
+                        }
+                        
+                        setAccumStart();                		
+                    }else{                
+                        buffer.clear();
+                        setAccumEnd();                		
+                    }
+                    
+                    res = processPlan();            	
+                    
+                    if (res.returnStatus == POStatus.STATUS_BATCH_OK) {
+                        // attach same input again to process next batch
+                        attachInputToPlans((Tuple) inp.result);
+                    } else {
+                        break;
+                    }
+                } 
+                
+            } else {                        
+                res = processPlan();          
+            }
             
             processingPlan = true;
 
             if(lineageTracer != null && res.result != null) {
-        	//we check for res.result since that can also be null in the case of flatten
-        	tIn = (ExampleTuple) inp.result;
-        	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-        	tOut.synthetic = tIn.synthetic;
-        	lineageTracer.insert(tOut);
-        	lineageTracer.union(tOut, tIn);
-        	res.result = tOut;
+            //we check for res.result since that can also be null in the case of flatten
+            tIn = (ExampleTuple) inp.result;
+            ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+            tOut.synthetic = tIn.synthetic;
+            lineageTracer.insert(tOut);
+            lineageTracer.union(tOut, tIn);
+            res.result = tOut;
             }
             
             return res;
         }
     }
 
-    protected Result processPlan() throws ExecException{
+    protected Result processPlan() throws ExecException{    	
         Result res = new Result();
-        
+
         //We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
         if(its != null) {
             boolean restartIts = true;
@@ -238,6 +321,7 @@
             }
         }
         
+ 
         if(its == null) {
             //getNext being called for the first time OR starting with a set of new data from inputs 
             its = new Iterator[noItems];
@@ -287,9 +371,13 @@
                 }
                 
                 }
-                
+
+                if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {                	
+                    continue;
+                }
+
                 if(inputData.returnStatus == POStatus.STATUS_EOP) {
-                    //we are done with all the elements. Time to return.
+                    //we are done with all the elements. Time to return.                	
                     its = null;
                     bags = null;
                     return inputData;
@@ -310,6 +398,11 @@
             }
         }
 
+        // if accumulating, we haven't got data yet for some fields, just return
+        if (isAccumulative() && isAccumStarted()) {
+            res.returnStatus = POStatus.STATUS_BATCH_OK;        	
+            return res;
+        }
         
         while(true) {
             if(data == null) {
@@ -396,13 +489,13 @@
 
     
     protected void attachInputToPlans(Tuple t) {
-        //super.attachInput(t);
-        for(PhysicalPlan p : inputPlans) {
+        //super.attachInput(t);    	
+        for(PhysicalPlan p : inputPlans) {        	
             p.attachInput(t);
         }
     }
     
-    protected void getLeaves() {
+    public void getLeaves() {
         if (inputPlans != null) {
             int i=-1;
             if(isToBeFlattenedArray == null) {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Tue Nov 24 19:54:19 2009
@@ -55,18 +55,26 @@
      */
     protected static final long serialVersionUID = 1L;
 
-    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     transient private Log log = LogFactory.getLog(getClass());
 
     protected List<PhysicalPlan> plans;
     
+    protected List<PhysicalPlan> secondaryPlans;
+    
     protected List<ExpressionOperator> leafOps;
+    
+    protected List<ExpressionOperator> secondaryLeafOps;
 
     // The position of this LR in the package operator
     protected byte index;
     
     protected byte keyType;
+    
+    protected byte mainKeyType;
+    
+    protected byte secondaryKeyType;
 
     protected boolean mIsDistinct = false;
     
@@ -83,6 +91,7 @@
     // 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key)
     // 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key)
     private Map<Integer, Integer> mProjectedColsMap;
+    private Map<Integer, Integer> mSecondaryProjectedColsMap;
 
     // A place holder Tuple used in distinct case where we really don't
     // have any value to pass through.  But hadoop gets cranky if we pass a
@@ -95,20 +104,25 @@
 	// is a project(*) - we set this ONLY when the project(*)
 	// is the ONLY thing in the cogroup by ..
 	private boolean mProjectStar = false;
+	private boolean mSecondaryProjectStar = false;
 
     // marker to note that the "key" is a tuple
     // this is required by POPackage to pick things
     // off the "key" correctly to stitch together the
     // "value"
     private boolean isKeyTuple = false;
+    private boolean isSecondaryKeyTuple = false;
 
     private int mProjectedColsMapSize = 0;
+    private int mSecondaryProjectedColsMapSize = 0;
 
     private ArrayList<Integer> minValuePositions;
     private int minValuePositionsSize = 0;
 
     private Tuple lrOutput;
     
+    private boolean useSecondaryKey = false;
+    
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -125,7 +139,9 @@
         super(k, rp, inp);
         index = -1;
         leafOps = new ArrayList<ExpressionOperator>();
+        secondaryLeafOps = new ArrayList<ExpressionOperator>();
         mProjectedColsMap = new HashMap<Integer, Integer>();
+        mSecondaryProjectedColsMap = new HashMap<Integer, Integer>();
         lrOutput = mTupleFactory.newTuple(3);
     }
 
@@ -246,7 +262,19 @@
             for (PhysicalPlan ep : plans) {
                 ep.attachInput((Tuple)inp.result);
             }
+            
             List<Result> resLst = new ArrayList<Result>();
+            
+            if (secondaryPlans!=null) {
+                for (PhysicalPlan ep : secondaryPlans) {
+                    ep.attachInput((Tuple)inp.result);
+                }
+            }
+            
+            List<Result> secondaryResLst = null;
+            if (secondaryLeafOps!=null)
+                secondaryResLst = new ArrayList<Result>();
+            
             for (ExpressionOperator op : leafOps){
                 
                 switch(op.getResultType()){
@@ -285,24 +313,66 @@
                     return new Result();
                 resLst.add(res);
             }
-            res.result = constructLROutput(resLst,(Tuple)inp.result);
+            
+            if (secondaryLeafOps!=null)
+            {
+                for (ExpressionOperator op : secondaryLeafOps){
+                    
+                    switch(op.getResultType()){
+                    case DataType.BAG:
+                        res = op.getNext(dummyBag);
+                        break;
+                    case DataType.BOOLEAN:
+                        res = op.getNext(dummyBool);
+                        break;
+                    case DataType.BYTEARRAY:
+                        res = op.getNext(dummyDBA);
+                        break;
+                    case DataType.CHARARRAY:
+                        res = op.getNext(dummyString);
+                        break;
+                    case DataType.DOUBLE:
+                        res = op.getNext(dummyDouble);
+                        break;
+                    case DataType.FLOAT:
+                        res = op.getNext(dummyFloat);
+                        break;
+                    case DataType.INTEGER:
+                        res = op.getNext(dummyInt);
+                        break;
+                    case DataType.LONG:
+                        res = op.getNext(dummyLong);
+                        break;
+                    case DataType.MAP:
+                        res = op.getNext(dummyMap);
+                        break;
+                    case DataType.TUPLE:
+                        res = op.getNext(dummyTuple);
+                        break;
+                    }
+                    if(res.returnStatus!=POStatus.STATUS_OK)
+                        return new Result();
+                    secondaryResLst.add(res);
+                }
+            }
+            // If we are using secondary sort key, our new key is:
+            // (nullable, index, (key, secondary key), value)
+            res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
             
             return res;
         }
         return inp;
     }
     
-    protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
-        //Construct key
+    protected Object getKeyFromResult(List<Result> resLst, byte type) throws ExecException {
         Object key;
-        
         if(resLst.size()>1){
             Tuple t = mTupleFactory.newTuple(resLst.size());
             int i=-1;
             for(Result res : resLst)
                 t.set(++i, res.result);
             key = t;           
-        } else if (resLst.size() == 1 && keyType == DataType.TUPLE) {
+        } else if (resLst.size() == 1 && type == DataType.TUPLE) {
             
             // We get here after merging multiple jobs that have different
             // map key types into a single job during multi-query optimization.
@@ -319,6 +389,21 @@
         else{
             key = resLst.get(0).result;
         }
+        return key;
+    }
+    
+    protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
+        //Construct key
+        Object key;
+        Object secondaryKey=null;
+        
+        
+        if (secondaryResLst!=null && secondaryResLst.size()>0)
+        {
+            key = getKeyFromResult(resLst, mainKeyType);
+            secondaryKey = getKeyFromResult(secondaryResLst, secondaryKeyType);
+        } else
+            key = getKeyFromResult(resLst, keyType);
         
         if (mIsDistinct) {
 
@@ -340,7 +425,15 @@
 
             //Put the index, key, and value
             //in a tuple and return
-            lrOutput.set(1, key);
+            if (useSecondaryKey)
+            {
+                Tuple compoundKey = mTupleFactory.newTuple(2);
+                compoundKey.set(0, key);
+                compoundKey.set(1, secondaryKey);
+                lrOutput.set(1, compoundKey);
+            }
+            else
+                lrOutput.set(1, key);
             
             // strip off the columns in the "value" which 
             // are present in the "key"
@@ -398,12 +491,20 @@
     }
 
     public void setKeyType(byte keyType) {
-        this.keyType = keyType;
+        if (useSecondaryKey)
+            this.mainKeyType = keyType;
+        else
+            this.keyType = keyType;
     }
 
     public List<PhysicalPlan> getPlans() {
         return plans;
     }
+    
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+        mainKeyType = keyType;
+    }
 
     public void setPlans(List<PhysicalPlan> plans) throws PlanException {
         this.plans = plans;
@@ -464,6 +565,74 @@
         }
         mProjectedColsMapSize = mProjectedColsMap.size();
     }
+    
+    public void setSecondaryPlans(List<PhysicalPlan> plans) throws PlanException {
+        this.secondaryPlans = plans;
+        secondaryLeafOps.clear();
+        int keyIndex = 0; // zero based index for fields in the key
+        for (PhysicalPlan plan : plans) {
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            secondaryLeafOps.add(leaf);
+            
+            // don't optimize CROSS
+            if(!isCross) {
+                // Look for the leaf Ops which are POProject operators - get the 
+                // the columns that these POProject Operators are projecting.
+                // They MUST be projecting either a column or '*'.
+                // Keep track of the columns which are being projected and
+                // the position in the "Key" where these will be projected to.
+                // Then we can use this information to strip off these columns
+                // from the "Value" and in POPackage stitch the right "Value"
+                // tuple back by getting these columns from the "key". The goal
+                // is reduce the amount of the data sent to Hadoop in the map.
+                if(leaf instanceof POProject) {
+                    POProject project = (POProject) leaf;
+                    if(project.isStar()) {
+                        if(secondaryPlans.size() == 1) {
+                            // note that we have a project *
+                            mSecondaryProjectStar  = true;
+                            // key will be a tuple in this case
+                            isSecondaryKeyTuple = true;
+                        } else {
+                            // TODO: currently "group by (*, somethingelse)" is NOT
+                            // allowed. So we should never get here. But once it is
+                            // allowed, we will need to handle it. For now just log
+                            log.debug("Project * in group by not being optimized in key-value transfer");
+                        }
+                    } else {
+                        try {
+                            List<PhysicalOperator> preds = plan.getPredecessors(leaf);
+                            if (preds==null || !(preds.get(0) instanceof POProject))
+                                mSecondaryProjectedColsMap.put(project.getColumn(), keyIndex);
+                        } catch (ExecException e) {
+                            int errCode = 2070;
+                            String msg = "Problem in accessing column from project operator.";
+                            throw new PlanException(msg, errCode, PigException.BUG);
+                        }
+                    }
+                    if(project.getResultType() == DataType.TUPLE)
+                        isSecondaryKeyTuple = true;
+                }
+                keyIndex++;
+            }
+        }
+        if(keyIndex > 1) {
+            // make a note that the "key" is a tuple
+            // this is required by POPackage to pick things
+            // off the "key" correctly to stitch together the
+            // "value"
+            isSecondaryKeyTuple  = true;
+        }
+        mainKeyType = keyType;
+        keyType = DataType.TUPLE;
+        if (plans.size()>1)
+            secondaryKeyType = DataType.TUPLE;
+        else
+        {
+            secondaryKeyType = plans.get(0).getLeaves().get(0).getResultType();
+        }
+        mSecondaryProjectedColsMapSize = mSecondaryProjectedColsMap.size();
+    }
 
     /**
      * Make a deep copy of this operator.  
@@ -488,6 +657,9 @@
             throw cnse;
         }
         clone.keyType = keyType;
+        clone.mainKeyType = mainKeyType;
+        clone.secondaryKeyType = secondaryKeyType;
+        clone.useSecondaryKey = useSecondaryKey;
         clone.index = index;
         try {
             clone.lrOutput.set(0, index);
@@ -516,6 +688,13 @@
     public Map<Integer, Integer> getProjectedColsMap() {
         return mProjectedColsMap;
     }
+    
+    /**
+     * @return the mProjectedColsMap
+     */
+    public Map<Integer, Integer> getSecondaryProjectedColsMap() {
+        return mSecondaryProjectedColsMap;
+    }
 
     /**
      * @return the mProjectStar
@@ -525,6 +704,13 @@
     }
 
     /**
+     * @return the mProjectStar
+     */
+    public boolean isSecondaryProjectStar() {
+        return mSecondaryProjectStar;
+    }
+
+    /**
      * @return the keyTuple
      */
     public boolean isKeyTuple() {
@@ -532,6 +718,13 @@
     }
 
     /**
+     * @return the keyTuple
+     */
+    public boolean isSecondaryKeyTuple() {
+        return isSecondaryKeyTuple;
+    }
+
+    /**
      * @param plans
      * @throws ExecException 
      */

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java Tue Nov 24 19:54:19 2009
@@ -76,7 +76,7 @@
             mIsDistinct + ") - " + mKey.toString();
     }
 
-    protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+    protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
         //Construct key
         Object key;
         if(resLst.size()>1){

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Tue Nov 24 19:54:19 2009
@@ -64,8 +64,6 @@
     private List<POPackage> packages = new ArrayList<POPackage>();
 
     transient private PigNullableWritable myKey;
-    
-    private int baseIndex = 0;      
 
     /**
      * Constructs an operator with the specified key.
@@ -111,7 +109,7 @@
 
     @Override
     public String name() {
-        return "MultiQuery Package[" + baseIndex +"] - " +  getOperatorKey().toString();
+        return "MultiQuery Package  - " +  getOperatorKey().toString();
     }
 
     @Override
@@ -174,7 +172,6 @@
 
         int index = (int)origIndex;
         index &= idxPart;
-        index -= baseIndex;
         
         if (index >= packages.size() || index < 0) {
             int errCode = 2140;
@@ -221,21 +218,4 @@
         return res;
     }
 
-    /**
-     * Sets the base index of this operator
-     * 
-     * @param baseIndex the base index of this operator
-     */
-    public void setBaseIndex(int baseIndex) {
-        this.baseIndex = baseIndex;
-    }
-
-    /**
-     * Returns the base index of this operator
-     * 
-     * @return the base index of this operator
-     */
-    public int getBaseIndex() {
-        return baseIndex;
-    }      
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Nov 24 19:54:19 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -25,6 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.DataBag;
@@ -89,6 +92,9 @@
     //key, no value.
     int numInputs;
     
+    // If the attaching map-reduce plan use secondary sort key
+    boolean useSecondaryKey = false;
+    
     //Denotes if inner is specified
     //on a particular input
     boolean[] inner;
@@ -107,8 +113,8 @@
     
     transient private final Log log = LogFactory.getLog(getClass());
 
-    protected static BagFactory mBagFactory = BagFactory.getInstance();
-    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -157,6 +163,15 @@
     public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
         tupIter = inp;
         key = k.getValueAsPigType();
+        if (useSecondaryKey)
+        {
+            try {
+                key = ((Tuple)key).get(0);
+            } catch (ExecException e) {
+                // TODO Exception
+                throw new RuntimeException(e);
+            }
+        }
         if(isKeyTuple) {
             // key is a tuple, cache the key as a
             // tuple for use in the getNext()
@@ -187,7 +202,7 @@
     public void setInner(boolean[] inner) {
         this.inner = inner;
     }
-
+   
     /**
      * From the inputs, constructs the output tuple
      * for this co-group in the required format which
@@ -206,38 +221,51 @@
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
                  
-            String bagType = null;
-            if (PigMapReduce.sJobConf != null) {
-       			bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
-       	    }
-            
-        	for (int i = 0; i < numInputs; i++) {        		          	           		
-        	    if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
-           			dbs[i] = mBagFactory.newDefaultBag();           			
-           	    } else {
-        	    	dbs[i] = new InternalCachedBag(numInputs);
-        	    }
-        	}      
-                           
-            //For each indexed tup in the inp, sort them
-            //into their corresponding bags based
-            //on the index
-            while (tupIter.hasNext()) {
-                NullableTuple ntup = tupIter.next();
-                int index = ntup.getIndex();
-                Tuple copy = getValueTuple(ntup, index);  
+            if (isAccumulative()) {
+                // create bag wrapper to pull tuples in many batches
+                // all bags have reference to the sample tuples buffer
+                // which contains tuples from one batch
+                POPackageTupleBuffer buffer = new POPackageTupleBuffer();
+                for (int i = 0; i < numInputs; i++) {
+                    dbs[i] = new AccumulativeBag(buffer, i);
+                }
                 
-                if (numInputs == 1) {
+            } else {
+                // create bag to pull all tuples out of iterator
+                String bagType = null;
+                if (PigMapReduce.sJobConf != null) {
+                       bagType = PigMapReduce.sJobConf.get("pig.cachedbag.type");       			
+                   }
+                                
+
+                for (int i = 0; i < numInputs; i++) {        		          	           		
+                    if (bagType != null && bagType.equalsIgnoreCase("default")) {        	    	
+                           dbs[i] = mBagFactory.newDefaultBag();           			
+                       } else {
+                        dbs[i] = new InternalCachedBag(numInputs);
+                    }
+                }      
+                               
+                //For each indexed tup in the inp, sort them
+                //into their corresponding bags based
+                //on the index
+                while (tupIter.hasNext()) {
+                    NullableTuple ntup = tupIter.next();
+                    int index = ntup.getIndex();
+                    Tuple copy = getValueTuple(ntup, index);  
                     
-                    // this is for multi-query merge where 
-                    // the numInputs is always 1, but the index
-                    // (the position of the inner plan in the 
-                    // enclosed operator) may not be 1.
-                    dbs[0].add(copy);
-                } else {
-                    dbs[index].add(copy);
+                    if (numInputs == 1) {
+                        
+                        // this is for multi-query merge where 
+                        // the numInputs is always 1, but the index
+                        // (the position of the inner plan in the 
+                        // enclosed operator) may not be 1.
+                        dbs[0].add(copy);
+                    } else {
+                        dbs[index].add(copy);
+                    }
+                    if(reporter!=null) reporter.progress();
                 }
-                if(reporter!=null) reporter.progress();
             }
                       
             //Construct the output tuple by appending
@@ -247,14 +275,16 @@
             res.set(0,key);
             int i=-1;
             for (DataBag bag : dbs) {
-                if(inner[++i]){
+                i++;
+                if(inner[i] && !isAccumulative()){
                     if(bag.size()==0){
                         detachInput();
                         Result r = new Result();
                         r.returnStatus = POStatus.STATUS_NULL;
                         return r;
                     }
-                }
+                } 
+                
                 res.set(i+1,bag);
             }
         }
@@ -403,5 +433,75 @@
     public void setDistinct(boolean distinct) {
         this.distinct = distinct;
     }
+    
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
 
+    private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
+        private List<Tuple>[] bags;
+        private Iterator<NullableTuple> iter;
+        private int batchSize;
+        private Object currKey;
+
+        @SuppressWarnings("unchecked")
+        public POPackageTupleBuffer() {    		
+            batchSize = 20000;
+            if (PigMapReduce.sJobConf != null) {
+                String size = PigMapReduce.sJobConf.get("pig.accumulative.batchsize");
+                if (size != null) {
+                    batchSize = Integer.parseInt(size);
+                }
+            }		
+            
+            this.bags = new List[numInputs];
+            for(int i=0; i<numInputs; i++) {
+                this.bags[i] = new ArrayList<Tuple>();
+            }
+            this.iter = tupIter;
+            this.currKey = key;
+        }
+        
+        @Override
+        public boolean hasNextBatch() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public void nextBatch() throws IOException {
+            for(int i=0; i<bags.length; i++) {
+                bags[i].clear();
+            }
+                        
+            key = currKey;			
+            for(int i=0; i<batchSize; i++) {
+                if (iter.hasNext()) {
+                     NullableTuple ntup = iter.next();
+                     int index = ntup.getIndex();
+                     Tuple copy = getValueTuple(ntup, index);		            
+                     if (numInputs == 1) {
+                            
+                            // this is for multi-query merge where 
+                            // the numInputs is always 1, but the index
+                            // (the position of the inner plan in the 
+                            // enclosed operator) may not be 1.
+                            bags[0].add(copy);
+                     } else {
+                            bags[index].add(copy);
+                     }
+                }
+            }
+        } 
+        
+        public void clear() {
+            for(int i=0; i<bags.length; i++) {
+                bags[i].clear();
+            }
+            iter = null;
+        }
+        
+        public Iterator<Tuple> getTuples(int index) {			
+            return bags[index].iterator();
+        }
+       };
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Tue Nov 24 19:54:19 2009
@@ -23,6 +23,10 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.pig.impl.util.Pair;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -96,6 +100,10 @@
     public POPackageLite clone() throws CloneNotSupportedException {
         POPackageLite clone = (POPackageLite)super.clone();
         clone.inner = null;
+        clone.keyInfo = new HashMap<Integer, Pair<Boolean,Map<Integer,Integer>>>();
+        for (Entry<Integer, Pair<Boolean, Map<Integer,Integer>>> entry: keyInfo.entrySet()) {
+            clone.keyInfo.put(entry.getKey(), entry.getValue());
+        }
         return clone;
     }
     

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Tue Nov 24 19:54:19 2009
@@ -207,7 +207,7 @@
 
 	// Returns bag of tuples 
     protected DataBag constructPROutput(List<Result> resLst, Tuple value) throws ExecException{
-		Tuple t = super.constructLROutput(resLst, value);
+		Tuple t = super.constructLROutput(resLst, null, value);
 
         //Construct key
         Object key = t.get(1);