You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/09 21:23:51 UTC

svn commit: r703233 - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physic...

Author: olga
Date: Thu Oct  9 12:23:51 2008
New Revision: 703233

URL: http://svn.apache.org/viewvc?rev=703233&view=rev
Log:
PIG-465: perf improvement - removing keys from the value

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Thu Oct  9 12:23:51 2008
@@ -275,3 +275,6 @@
     PIG-457: report 100% on successful jobs only (shravanmn via olgan)
 
     PIG-471: ignoring status errors from hadoop (pradeepk via olgan)
+
+    PIG-465: performance improvement - removing keys from the value (pradeepk
+    via olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Oct  9 12:23:51 2008
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.pig.data.DataType;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -393,7 +394,7 @@
         func.setAlgebraicFunction(type);
     }
 
-    private void fixUpRearrange(POLocalRearrange rearrange) {
+    private void fixUpRearrange(POLocalRearrange rearrange) throws ExecException {
         // Set the projection to be the key
         PhysicalPlan newPlan = new PhysicalPlan();
         String scope = rearrange.getOperatorKey().scope;
@@ -404,8 +405,7 @@
         newPlan.add(proj);
         List<PhysicalPlan> plans = new ArrayList<PhysicalPlan>(1);
         plans.add(newPlan);
-        rearrange.setPlans(plans);
-        rearrange.setIndex(mKeyField);
+        rearrange.setPlansFromCombiner(plans);
     }
 
     private class AlgebraicPlanChecker extends PhyPlanVisitor {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Oct  9 12:23:51 2008
@@ -381,7 +381,7 @@
                     Class comparator = PigContext.resolveClassName(compFuncSpec);
                     if(ComparisonFunc.class.isAssignableFrom(comparator)) {
                         jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
-                        pack.setKeyType(DataType.TUPLE);
+                        jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class);
                         jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                         jobConf.set("pig.usercomparator", "true");
                         jobConf.setOutputKeyClass(NullableTuple.class);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Thu Oct  9 12:23:51 2008
@@ -21,6 +21,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -118,7 +119,11 @@
             CombinerOptimizer co = new CombinerOptimizer(plan);
             co.visit();
         }
-
+        
+        // optimize key - value handling in package
+        POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
+        pkgAnnotator.visit();
+        
         // check whether stream operator is present
         MRStreamHandler checker = new MRStreamHandler(plan);
         checker.visit();

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Oct  9 12:23:51 2008
@@ -36,6 +36,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -136,6 +137,10 @@
             co.visit();
         }
         
+        // optimize key - value handling in package
+        POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
+        pkgAnnotator.visit();
+        
         // check whether stream operator is present
         MRStreamHandler checker = new MRStreamHandler(plan);
         checker.visit();

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Oct  9 12:23:51 2008
@@ -48,6 +48,7 @@
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.WrappedIOException;
 
 /**
  * This class is the static Mapper &amp; Reducer classes that
@@ -95,14 +96,26 @@
         }
     }
     
+    /**
+     * This "specialized" map class is ONLY to be used in pig queries with
+     * order by a udf. A UDF used for comparison in the order by expects
+     * to be handed tuples. Hence this map class ensures that the "key" used
+     * in the order by is wrapped into a tuple (if it isn't already a tuple)
+     */
     public static class MapWithComparator extends PigMapBase implements
             Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
 
         @Override
         public void collect(OutputCollector<PigNullableWritable, Writable> oc,
                 Tuple tuple) throws ExecException, IOException {
-            Object k = tuple.get(1);
-            Tuple keyTuple = tf.newTuple(k);
+            Object keyTuple = null;
+            if(keyType != DataType.TUPLE) {
+                Object k = tuple.get(1);
+                keyTuple = tf.newTuple(k);
+            } else {
+                keyTuple = tuple.get(1);
+            }
+            
 
             Byte index = (Byte)tuple.get(0);
             PigNullableWritable key =
@@ -121,23 +134,23 @@
     public static class Reduce extends MapReduceBase
             implements
             Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-        private final Log log = LogFactory.getLog(getClass());
+        protected final Log log = LogFactory.getLog(getClass());
         
         //The reduce plan
-        private PhysicalPlan rp;
+        protected PhysicalPlan rp;
         
         //The POPackage operator which is the
         //root of every Map Reduce plan is
         //obtained through the job conf. The portion
         //remaining after its removal is the reduce
         //plan
-        private POPackage pack;
+        protected POPackage pack;
         
         ProgressableReporter pigReporter;
 
-        private OutputCollector<PigNullableWritable, Writable> outputCollector;
+        protected OutputCollector<PigNullableWritable, Writable> outputCollector;
 
-        private boolean errorInReduce = false;
+        protected boolean errorInReduce = false;
         
         /**
          * Configures the Reduce plan, the POPackage operator
@@ -231,7 +244,7 @@
          * @throws ExecException 
          * @throws IOException 
          */
-        private void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
+        protected void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
             while(true)
             {
                 Tuple dummyTuple = null;  
@@ -307,4 +320,98 @@
         }
     }
     
+    /**
+     * This "specialized" reduce class is ONLY to be used in pig queries with
+     * order by a udf. A UDF used for comparison in the order by expects
+     * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
+     * ensures that the "key" used in the order by is wrapped into a tuple (if it 
+     * isn't already a tuple). This reduce class unwraps this tuple in the case where
+     * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
+     * for processing
+     */
+    public static class ReduceWithComparator extends PigMapReduce.Reduce {
+        
+        private byte keyType;
+        
+        /**
+         * Configures the Reduce plan, the POPackage operator
+         * and the reporter thread
+         */
+        @Override
+        public void configure(JobConf jConf) {
+            super.configure(jConf);
+            keyType = pack.getKeyType();
+        }
+
+        /**
+         * The reduce function which packages the key and List&lt;Tuple&gt;
+         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
+         * The package result is either collected as is, if the reduce plan is
+         * empty or after passing through the reduce plan.
+         */
+        public void reduce(PigNullableWritable key,
+                Iterator<NullableTuple> tupIter,
+                OutputCollector<PigNullableWritable, Writable> oc,
+                Reporter reporter) throws IOException {
+            
+            // cache the collector for use in runPipeline()
+            // which could additionally be called from close()
+            this.outputCollector = oc;
+            pigReporter.setRep(reporter);
+            
+            // If the keyType is not a tuple, the MapWithComparator.collect()
+            // would have wrapped the key into a tuple so that the 
+            // comparison UDF used in the order by can process it.
+            // We need to unwrap the key out of the tuple and hand it
+            // to the POPackage for processing
+            if(keyType != DataType.TUPLE) {
+                Tuple t = (Tuple)(key.getValueAsPigType());
+                try {
+                    key = HDataType.getWritableComparableTypes(t.get(0), keyType);
+                } catch (ExecException e) {
+                    throw WrappedIOException.wrap(e);
+                }
+            }
+            
+            pack.attachInput(key, tupIter);
+            
+            try {
+                Tuple t=null;
+                Result res = pack.getNext(t);
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    Tuple packRes = (Tuple)res.result;
+                    
+                    if(rp.isEmpty()){
+                        oc.collect(null, packRes);
+                        return;
+                    }
+                    
+                    rp.attachInput(packRes);
+
+                    List<PhysicalOperator> leaves = rp.getLeaves();
+                    
+                    PhysicalOperator leaf = leaves.get(0);
+                    runPipeline(leaf);
+                    
+                }
+                
+                if(res.returnStatus==POStatus.STATUS_NULL) {
+                    return;
+                }
+                
+                if(res.returnStatus==POStatus.STATUS_ERR){
+                    IOException ioe = new IOException("Packaging error while processing group");
+                    throw ioe;
+                }
+                    
+                
+            } catch (ExecException e) {
+                IOException ioe = new IOException(e.getMessage());
+                ioe.initCause(e.getCause());
+                throw ioe;
+            }
+        }
+
+    }
+    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Oct  9 12:23:51 2008
@@ -76,6 +76,10 @@
         //do nothing
     }
     
+    public void visitCombinerPackage(POPostCombinerPackage pkg) throws VisitorException{
+        //do nothing
+    }
+    
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         List<PhysicalPlan> inpPlans = nfe.getInputPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -231,5 +235,19 @@
 	
     }
 
+    /**
+     * @param localRearrangeForIllustrate
+     * @throws VisitorException 
+     */
+    public void visitLocalRearrangeForIllustrate(
+            POLocalRearrangeForIllustrate lrfi) throws VisitorException {
+        List<PhysicalPlan> inpPlans = lrfi.getPlans();
+        for (PhysicalPlan plan : inpPlans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+        }
+        
+    }
+
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Oct  9 12:23:51 2008
@@ -18,7 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +32,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -47,31 +50,59 @@
     /**
      * 
      */
-    private static final long serialVersionUID = 1L;
+    protected static final long serialVersionUID = 1L;
 
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     private Log log = LogFactory.getLog(getClass());
 
-    List<PhysicalPlan> plans;
+    protected List<PhysicalPlan> plans;
     
-    List<ExpressionOperator> leafOps;
+    protected List<ExpressionOperator> leafOps;
 
     // The position of this LR in the package operator
-    byte index;
+    protected byte index;
     
-    byte keyType;
+    protected byte keyType;
 
-    private boolean mIsDistinct = false;
+    protected boolean mIsDistinct = false;
     
-    private boolean isCross = false;
+    protected boolean isCross = false;
+    
+    // map to store mapping of projected columns to
+    // the position in the "Key" where these will be projected to.
+    // We use this information to strip off these columns
+    // from the "Value" and in POPackage stitch the right "Value"
+    // tuple back by getting these columns from the "key". The goal
+    // is to reduce the amount of the data sent to Hadoop in the map.
+    // Example: a  = load 'bla'; b = load 'bla'; c = cogroup a by ($2, $3), b by ($, $2)
+    // For the first input (a), the map would contain following key:value
+    // 2:0 (2 corresponds to $2 in cogroup a by ($2, $3) and 0 corresponds to 1st index in key)
+    // 3:1 (3 corresponds to $3 in cogroup a by ($2, $3) and 0 corresponds to 2nd index in key)
+    private Map<Integer, Integer> mProjectedColsMap;
 
     // A place holder Tuple used in distinct case where we really don't
     // have any value to pass through.  But hadoop gets cranky if we pass a
     // null, so we'll just create one instance of this empty tuple and
     // pass it for every row.  We only get around to actually creating it if
     // mIsDistinct is set to true.
-    private Tuple mFakeTuple = null;
+    protected Tuple mFakeTuple = null;
+
+	// indicator whether the project in the inner plans
+	// is a project(*) - we set this ONLY when the project(*)
+	// is the ONLY thing in the cogroup by ..
+	private boolean mProjectStar = false;
+
+    // marker to note that the "key" is a tuple
+    // this is required by POPackage to pick things
+    // off the "key" correctly to stitch together the
+    // "value"
+    private boolean isKeyTuple = false;
+
+    private int mProjectedColsMapSize = 0;
+
+    private ArrayList<Integer> minValuePositions;
+    private int minValuePositionsSize = 0;
 
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
@@ -89,6 +120,7 @@
         super(k, rp, inp);
         index = -1;
         leafOps = new ArrayList<ExpressionOperator>();
+        mProjectedColsMap = new HashMap<Integer, Integer>();
     }
 
     @Override
@@ -206,12 +238,13 @@
                 resLst.add(res);
             }
             res.result = constructLROutput(resLst,(Tuple)inp.result);
+            
             return res;
         }
         return inp;
     }
     
-    private Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+    protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
         //Construct key
         Object key;
         if(resLst.size()>1){
@@ -234,17 +267,70 @@
             output.set(1, key);
             output.set(2, mFakeTuple);
             return output;
+        } else if(isCross){
+        
+            for(int i=0;i<plans.size();i++)
+                value.getAll().remove(0);
+            //Put the index, key, and value
+            //in a tuple and return
+            output.set(0, new Byte(index));
+            output.set(1, key);
+            output.set(2, value);
+            return output;
         } else {
-            if(isCross){
-                for(int i=0;i<plans.size();i++)
-                    value.getAll().remove(0);
-            }
 
             //Put the index, key, and value
             //in a tuple and return
             output.set(0, new Byte(index));
             output.set(1, key);
-            output.set(2, value);
+            
+            // strip off the columns in the "value" which 
+            // are present in the "key"
+            if(mProjectedColsMapSize != 0 || mProjectStar == true) {
+
+                Tuple minimalValue = null;
+                if(!mProjectStar) {
+                    if(minValuePositions == null) {
+                        // the very first time, we will have to build
+                        // the "value" tuple piecemeal but we can
+                        // do better next time round
+                        minValuePositions = new ArrayList<Integer>();
+                        minimalValue = mTupleFactory.newTuple();
+                        // look for individual columns that we are
+                        // projecting
+                        for (int i = 0; i < value.size(); i++) {
+                            if(mProjectedColsMap.get(i) == null) {
+                                // this column was not found in the "key"
+                                // so send it in the "value"
+                                minimalValue.append(value.get(i));
+                                minValuePositions.add(i);
+                            }
+                        }
+                        minValuePositionsSize = minValuePositions.size();
+                    } else {
+                        minimalValue = mTupleFactory.newTuple(minValuePositionsSize);
+                        for(int i = 0; i < minValuePositionsSize; i++) {
+                            minimalValue.set(i, value.get(minValuePositions.get(i)));
+                        }
+                    }
+                } else {
+                    // for the project star case
+                    // we would send out an empty tuple as
+                    // the "value" since all elements are in the
+                    // "key"
+                    minimalValue = mTupleFactory.newTuple();
+    
+                }
+                output.set(2, minimalValue);
+            
+            } else {
+            
+                // there were no columns in the "key"
+                // which we can strip off from the "value"
+                // so just send the value we got
+                output.set(2, value);
+                
+            }
             return output;
         }
     }
@@ -264,9 +350,53 @@
     public void setPlans(List<PhysicalPlan> plans) {
         this.plans = plans;
         leafOps.clear();
+        int keyIndex = 0; // zero based index for fields in the key
         for (PhysicalPlan plan : plans) {
-            leafOps.add((ExpressionOperator)plan.getLeaves().get(0));
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            leafOps.add(leaf);
+            
+            // don't optimize CROSS
+            if(!isCross) {
+                // Look for the leaf Ops which are POProject operators - get the 
+                // the columns that these POProject Operators are projecting.
+                // They MUST be projecting either a column or '*'.
+                // Keep track of the columns which are being projected and
+                // the position in the "Key" where these will be projected to.
+                // Then we can use this information to strip off these columns
+                // from the "Value" and in POPackage stitch the right "Value"
+                // tuple back by getting these columns from the "key". The goal
+                // is reduce the amount of the data sent to Hadoop in the map.
+                if(leaf instanceof POProject) {
+                    POProject project = (POProject) leaf;
+                    if(project.isStar()) {
+                        if(plans.size() == 1) {
+                            // note that we have a project *
+                            mProjectStar  = true;
+                            // key will be a tuple in this case
+                            isKeyTuple = true;
+                        } else {
+                            // TODO: currently "group by (*, somethingelse)" is NOT
+                            // allowed. So we should never get here. But once it is
+                            // allowed, we will need to handle it. For now just log
+                            log.debug("Project * in group by not being optimized in key-value transfer");
+                        }
+                    } else {
+                        mProjectedColsMap.put(project.getColumn(), keyIndex);
+                    }
+                    if(project.getResultType() == DataType.TUPLE)
+                        isKeyTuple = true;
+                }
+                keyIndex++;
+            }
+        }
+        if(keyIndex > 1) {
+            // make a note that the "key" is a tuple
+            // this is required by POPackage to pick things
+            // off the "key" correctly to stitch together the
+            // "value"
+            isKeyTuple  = true;
         }
+        mProjectedColsMapSize = mProjectedColsMap.size();
     }
 
     /**
@@ -301,5 +431,74 @@
         this.isCross = isCross;
     }
 
+    /**
+     * @return the mProjectedColsMap
+     */
+    public Map<Integer, Integer> getProjectedColsMap() {
+        return mProjectedColsMap;
+    }
+
+    /**
+     * @return the mProjectStar
+     */
+    public boolean isProjectStar() {
+        return mProjectStar;
+    }
+
+    /**
+     * @return the keyTuple
+     */
+    public boolean isKeyTuple() {
+        return isKeyTuple;
+    }
+
+    /**
+     * @param plans2
+     * @throws ExecException 
+     */
+    public void setPlansFromCombiner(List<PhysicalPlan> plans) throws ExecException {
+        this.plans = plans;
+        leafOps.clear();
+        mProjectedColsMap.clear();
+        int keyIndex = 0; // zero based index for fields in the key
+        for (PhysicalPlan plan : plans) {
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            leafOps.add(leaf);
+            
+            // don't optimize CROSS
+            if(!isCross) {
+                // Look for the leaf Ops which are POProject operators - get the 
+                // the columns that these POProject Operators are projecting.
+                // Keep track of the columns which are being projected and
+                // the position in the "Key" where these will be projected to.
+                // Then we can use this information to strip off these columns
+                // from the "Value" and in POPostCombinerPackage stitch the right "Value"
+                // tuple back by getting these columns from the "key". The goal
+                // is reduce the amount of the data sent to Hadoop in the map.
+                if(leaf instanceof POProject) {
+                    POProject project = (POProject) leaf;
+                    if(project.isStar()) {
+                        log.error("Unexpected data during optimization");
+                        throw new ExecException("Unexpected data during optimization (Local rearrange" +
+                                " in combiner has a project *" );
+                    } else {
+                        mProjectedColsMap.put(project.getColumn(), keyIndex);
+                    }
+                    if(project.getResultType() == DataType.TUPLE)
+                        isKeyTuple = true;
+                }
+                keyIndex++;
+            }
+        }
+        if(keyIndex > 1) {
+            // make a note that the "key" is a tuple
+            // this is required by POPackage to pick things
+            // off the "key" correctly to stitch together the
+            // "value"
+            isKeyTuple  = true;
+        }
+        mProjectedColsMapSize  = mProjectedColsMap.size();
+        
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Oct  9 12:23:51 2008
@@ -17,8 +17,10 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 /**
  * The package operator that packages
  * the globally rearranged tuples into
@@ -64,6 +67,11 @@
     //The key being worked on
     Object key;
     
+    // marker to indicate if key is a tuple
+    protected boolean isKeyTuple = false;
+    // key as a Tuple object (if the key is a tuple)
+    protected Tuple keyAsTuple;
+    
     //key's type
     byte keyType;
 
@@ -76,6 +84,14 @@
     //on a particular input
     boolean[] inner;
     
+    // A mapping of input index to key information got from LORearrange
+    // for that index. The Key information is a pair of boolean, Map.
+    // The boolean indicates whether there is a lone project(*) in the 
+    // cogroup by. If not, the Map has a mapping of column numbers in the 
+    // "value" to column numbers in the "key" which contain the fields in
+    // the "value"
+    protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+    
     private final Log log = LogFactory.getLog(getClass());
 
     protected static BagFactory mBagFactory = BagFactory.getInstance();
@@ -96,6 +112,7 @@
     public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp, inp);
         numInputs = -1;
+        keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
     }
 
     @Override
@@ -127,6 +144,11 @@
     public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
         tupIter = inp;
         key = k.getValueAsPigType();
+        if(isKeyTuple) {
+            // key is a tuple, cache the key as a
+            // tuple for use in the getNext()
+            keyAsTuple = (Tuple)key;
+        }
     }
 
     /**
@@ -183,8 +205,64 @@
                 copy.set(i, val.get(i));
             }
             */
-            Tuple copy = mTupleFactory.newTuple(val.getAll());
-            if (numInputs > 0) dbs[ntup.getIndex()].add(copy);
+            
+            Tuple copy = null;
+            // The "value (val)" that we just got may not
+            // be the complete "value". It may have some portions
+            // in the "key" (look in POLocalRearrange for more comments)
+            // If this is the case we need to stitch
+            // the "value" together.
+            int index = ntup.getIndex();
+            Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+                keyInfo.get(index);
+            boolean isProjectStar = lrKeyInfo.first;
+            Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+            int keyLookupSize = keyLookup.size();
+
+            if( keyLookupSize > 0) {
+            
+                // we have some fields of the "value" in the
+                // "key".
+                copy = mTupleFactory.newTuple();
+                int finalValueSize = keyLookupSize + val.size();
+                int valIndex = 0; // an index for accessing elements from 
+                                  // the value (val) that we have currently
+                for(int i = 0; i < finalValueSize; i++) {
+                    Integer keyIndex = keyLookup.get(i);
+                    if(keyIndex == null) {
+                        // the field for this index is not in the
+                        // key - so just take it from the "value"
+                        // we were handed
+                        copy.append(val.get(valIndex));
+                        valIndex++;
+                    } else {
+                        // the field for this index is in the key
+                        if(isKeyTuple) {
+                            // the key is a tuple, extract the
+                            // field out of the tuple
+                            copy.append(keyAsTuple.get(keyIndex));
+                        } else {
+                            copy.append(key);
+                        }
+                    }
+                }
+                
+            } else if (isProjectStar) {
+                
+                log.info("In project star, keyAsTuple:" + keyAsTuple);
+                // the whole "value" is present in the "key"
+                copy = mTupleFactory.newTuple(keyAsTuple.getAll());
+                
+            } else {
+                
+                // there is no field of the "value" in the
+                // "key" - so just make a copy of what we got
+                // as the "value"
+                copy = mTupleFactory.newTuple(val.getAll());
+                
+            }
+            
+            if (numInputs > 0) dbs[index].add(copy);
             if(reporter!=null) reporter.progress();
         }
         
@@ -241,5 +319,26 @@
         return clone;
     }
 
+    /**
+     * @param keyInfo the keyInfo to set
+     */
+    public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+        this.keyInfo = keyInfo;
+    }
+
+    /**
+     * @param keyTuple the keyTuple to set
+     */
+    public void setKeyTuple(boolean keyTuple) {
+        this.isKeyTuple = keyTuple;
+    }
+
+    /**
+     * @return the keyInfo
+     */
+    public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
+        return keyInfo;
+    }
+
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java Thu Oct  9 12:23:51 2008
@@ -19,6 +19,7 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 /**
  * The package operator that packages the globally rearranged tuples into
  * output format after the combiner stage.  It differs from POPackage in that
@@ -98,15 +100,39 @@
         while (tupIter.hasNext()) {
             NullableTuple ntup = tupIter.next();
             Tuple tup = (Tuple)ntup.getValueAsPigType();
-            for (int i = 0; i < tup.size(); i++) {
-                if (mBags[i]) ((DataBag)fields[i]).add((Tuple)tup.get(i));
-                else fields[i] = tup.get(i);
+            // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+            // group case and not in cogroups. So there should only
+            // be one LocalRearrange from which we get the keyInfo for
+            // which field in the value is in the key. This LocalRearrange
+            // has an index of -1. When we do support combiner in Cogroups
+            // THIS WILL NEED TO BE REVISITED.
+            Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+                keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+            Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+            int tupIndex = 0; // an index for accessing elements from 
+                              // the value (tup) that we have currently
+            for(int i = 0; i < mBags.length; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if(keyIndex == null) {
+                    // the field for this index is not the
+                    // key - so just take it from the "value"
+                    // we were handed - Currently THIS HAS TO BE A BAG
+                    // In future if this changes, THIS WILL NEED TO BE
+                    // REVISITED.
+                    ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+                    tupIndex++;
+                } else {
+                    // the field for this index is in the key
+                    fields[i] = key;
+                }
             }
         }
         
-        //Construct the output tuple by appending
-        //the key and all the above constructed bags
-        //and return it.
+        // The successor of the POPostCombinerPackage as of 
+        // now SHOULD be a POForeach which has been adjusted
+        // to look for the key in the right place - so we will
+        // NOT be adding the key in the result here but mere 
+        // putting all bags into a result tuple and returning it. 
         Tuple res;
         res = mTupleFactory.newTuple(mBags.length);
         for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Thu Oct  9 12:23:51 2008
@@ -10,6 +10,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
@@ -52,7 +53,7 @@
         for(LogicalOperator lo : inputs) {
             List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo);
             
-            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+            POLocalRearrangeForIllustrate physOp = new POLocalRearrangeForIllustrate(new OperatorKey(
                     scope, nodeGen.getNextNodeId(scope)), cg
                     .getRequestedParallelism());
             List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/Pair.java Thu Oct  9 12:23:51 2008
@@ -34,4 +34,12 @@
         first = f;
         second = s;
     }
+    
+    /* (non-Javadoc)
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return "[" + first.toString() +"," + second.toString() + "]";
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu Oct  9 12:23:51 2008
@@ -84,15 +84,29 @@
         int size=0;
         for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
             Tuple t = (Tuple)res.result;
+            String key = (String)t.get(1);
             Tuple val = (Tuple)t.get(2);
+            // The input data has 2 columns of which the first
+            // is the key
+            // With the optimized LocalRearrange, the part
+            // of the "value" present in the "key" is 
+            // excluded from the "value". So to reconstruct
+            // the true "value", create a tuple with "key" in
+            // first position and the "value" (val) we currently
+            // have in the second position
+            assertEquals(1, val.size());
+            
+            Tuple actualVal = new DefaultTuple();
+            actualVal.append(key);
+            actualVal.append(val.get(0));
             //Check if the index is same as input index
             assertEquals((byte)0, (byte)(Byte)t.get(0));
             
             //Check if the input bag contains the value tuple
-            assertTrue(TestHelper.bagContains(db, val));
+            assertTrue(TestHelper.bagContains(db, actualVal));
             
             //Check if the input key and the output key are same
-            String inpKey = (String)val.get(0);
+            String inpKey = (String)actualVal.get(0);
             assertEquals(0, inpKey.compareTo((String)t.get(1)));
             ++size;
         }
@@ -127,10 +141,23 @@
         int size=0;
         for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
             Tuple t = (Tuple)res.result;
+            Tuple key = (Tuple)t.get(1);
             Tuple val = (Tuple)t.get(2);
+            
+            // The input data has 2 columns of which both
+            // are the key.
+            // With the optimized LocalRearrange, the part
+            // of the "value" present in the "key" is 
+            // excluded from the "value". So in this case, 
+            // the "value" coming out of the LocalRearrange
+            // would be an empty tuple
+            assertEquals(0, val.size());
+            
             //Check if the index is same as input index
             assertEquals((byte)0, (byte)(Byte)t.get(0));
             
+            // reconstruct value from tuple
+            val = key;
             //Check if the input baf contains the value tuple
             assertTrue(TestHelper.bagContains(db, val));
             

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java Thu Oct  9 12:23:51 2008
@@ -30,7 +30,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
 import org.apache.pig.data.BagFactory;
@@ -78,7 +78,7 @@
         p1.add(prj1);
         List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
         in1.add(p1);
-        POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+        POLocalRearrangeForIllustrate lr1 = new POLocalRearrangeForIllustrate(new OperatorKey("", r
                 .nextLong()), -1, inputs1);
         lr1.setPlans(in1);
         lr1.setIndex(0);
@@ -92,7 +92,7 @@
         p2.add(prj2);
         List<PhysicalPlan> in2 = new LinkedList<PhysicalPlan>();
         in2.add(p2);
-        POLocalRearrange lr2 = new POLocalRearrange(new OperatorKey("", r
+        POLocalRearrangeForIllustrate lr2 = new POLocalRearrangeForIllustrate(new OperatorKey("", r
                 .nextLong()), -1, inputs2);
         lr2.setPlans(in2);
         lr2.setIndex(1);
@@ -172,7 +172,7 @@
         p1.add(prj1);
         List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
         in1.add(p1);
-        POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+        POLocalRearrangeForIllustrate lr1 = new POLocalRearrangeForIllustrate(new OperatorKey("", r
                 .nextLong()), -1, inputs1);
         lr1.setPlans(in1);
         lr1.setIndex(0);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=703233&r1=703232&r2=703233&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Thu Oct  9 12:23:51 2008
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
@@ -79,6 +81,18 @@
         pop.setInner(inner);
         PigNullableWritable k = HDataType.getWritableComparableTypes(key, (byte)0);
         pop.attachInput(k, db.iterator());
+        
+        // we are not doing any optimization to remove
+        // parts of the "value" which are present in the "key" in this
+        // unit test - so set up the "keyInfo" accordingly in 
+        // the POPackage
+        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = 
+            new HashMap<Integer, Pair<Boolean, Map<Integer,Integer>>>();
+        Pair<Boolean, Map<Integer, Integer>> p = 
+            new Pair<Boolean, Map<Integer, Integer>>(false, new HashMap<Integer, Integer>());
+        keyInfo.put(0, p);
+        keyInfo.put(1, p);
+        pop.setKeyInfo(keyInfo);
         Tuple t = null;
         Result res = null;
         res = (Result) pop.getNext(t);