You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/20 22:42:38 UTC

svn commit: r827786 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/impl/io/ test/org/apache/pig/test/

Author: pradeepkth
Date: Tue Oct 20 20:42:37 2009
New Revision: 827786

URL: http://svn.apache.org/viewvc?rev=827786&view=rev
Log:
PIG-976: Multi-query optimization throws ClassCastException (rding via pradeepkth)

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Oct 20 20:42:37 2009
@@ -63,6 +63,9 @@
 
 BUG FIXES
 
+PIG-976: Multi-query optimization throws ClassCastException (rding via
+pradeepkth)
+
 PIG-858: Order By followed by "replicated" join fails while compiling MR-plan
 from physical plan (ashutoshc via gates)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Oct 20 20:42:37 2009
@@ -106,6 +106,11 @@
         DISTINCT };
 
     private int mKeyField = -1;
+    
+    // This array tracks the positions of the group key in the output tuples 
+    // of the foreach clause. This needs to be revisited when combiner optimizer
+    // supports foreach output with parts of group key (e.g. group.$0).
+    private boolean[] keyFieldPositions;
 
     private byte mKeyType = 0;
     
@@ -247,7 +252,7 @@
 					// as it needs to act differently than the regular
 					// package operator.
                     POCombinerPackage combinePack =
-                        new POCombinerPackage(pack, bags);
+                        new POCombinerPackage(pack, bags, keyFieldPositions);
                     mr.combinePlan.add(combinePack);
                     mr.combinePlan.add(cfe);
                     mr.combinePlan.connect(combinePack, cfe);
@@ -282,7 +287,7 @@
                     // be the POCombiner package, as it needs to act
                     // differently than the regular package operator.
                     POCombinerPackage newReducePack =
-                        new POCombinerPackage(pack, bags);
+                        new POCombinerPackage(pack, bags, keyFieldPositions);
                     mr.reducePlan.replace(pack, newReducePack);
                     
                     // the replace() above only changes
@@ -360,6 +365,7 @@
         List<ExprType> types = new ArrayList<ExprType>(plans.size());
         boolean atLeastOneAlgebraic = false;
         boolean noNonAlgebraics = true;
+        keyFieldPositions = new boolean[plans.size()];
         for (int i = 0; i < plans.size(); i++) {
             ExprType t = algebraic(plans.get(i), flattens.get(i), i);
             types.add(t);
@@ -412,6 +418,7 @@
             if (cols != null && cols.size() == 1 && cols.get(0) == 0 &&
                     pp.getPredecessors(proj) == null) {
                 mKeyField = field;
+                keyFieldPositions[field] = true;
                 mKeyType = proj.getResultType();
             } else {
                 // It can't be a flatten except on the grouping column
@@ -525,6 +532,8 @@
             addKeyProject(mfe);
             addKeyProject(cfe);
             mKeyField = cPlans.size() - 1;
+            keyFieldPositions = new boolean[cPlans.size()];
+            keyFieldPositions[mKeyField] = true;
         }
 
         // Change the plans on the reduce/combine foreach to project from the column
@@ -925,5 +934,6 @@
     private void resetState() {
         mKeyField = -1;
         mKeyType = 0;
+        keyFieldPositions = null;
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Tue Oct 20 20:42:37 2009
@@ -600,9 +600,11 @@
             pkg.addPackage(pk);
         }
         
+        boolean[] keyPos = pk.getKeyPositionsInTuple();
+        
         PODemux demux = (PODemux)to.getLeaves().get(0);
         for (int i=initial; i<current; i++) {
-            demux.addPlan(from, mapKeyType);
+            demux.addPlan(from, mapKeyType, keyPos);
         }
                
         if (demux.isSameMapKeyType()) {
@@ -752,11 +754,13 @@
         
         pkg.setKeyType(cpk.getKeyType());
         
+        boolean[] keyPos = cpk.getKeyPositionsInTuple();
+        
         // See comment above for why we replicated the Package
         // in the from plan - for the same reason, we replicate
         // the Demux operators now.
         for (int i=initial; i<current; i++) {
-            demux.addPlan(from, mapKeyType);
+            demux.addPlan(from, mapKeyType, keyPos);
         }
     }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Tue Oct 20 20:42:37 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -58,9 +59,12 @@
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     private boolean[] mBags; // For each field, indicates whether or not it
-
-    private Map<Integer, Integer> keyLookup;
                              // needs to be put in a bag.
+    
+    private boolean[] keyPositions;
+    
+    private Map<Integer, Integer> keyLookup;
+                             
 
     /**
      * A new POPostCombinePackage will be constructed as a near clone of the
@@ -68,8 +72,10 @@
      * @param pkg POPackage to clone.
      * @param bags for each field, indicates whether it should be a bag (true)
      * or a simple field (false).
+     * @param keyPos for each field in the output tuple of the foreach operator, 
+     * indicates whether it's the group key.
      */
-    public POCombinerPackage(POPackage pkg, boolean[] bags) {
+    public POCombinerPackage(POPackage pkg, boolean[] bags, boolean[] keyPos) {
         super(new OperatorKey(pkg.getOperatorKey().scope,
             NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
             pkg.getRequestedParallelism(), pkg.getInputs());
@@ -80,7 +86,12 @@
         for (int i = 0; i < pkg.inner.length; i++) {
             inner[i] = true;
         }
-        mBags = bags;
+        if (bags != null) {
+            mBags = Arrays.copyOf(bags, bags.length);
+        }
+        if (keyPos != null) {
+            keyPositions = Arrays.copyOf(keyPos, keyPos.length);
+        }
     }
 
     @Override
@@ -129,7 +140,7 @@
                               // the value (tup) that we have currently
             for(int i = 0; i < mBags.length; i++) {
                 Integer keyIndex = keyLookup.get(i);
-                if(keyIndex == null) {
+                if(keyIndex == null && mBags[i]) {
                     // the field for this index is not the
                     // key - so just take it from the "value"
                     // we were handed - Currently THIS HAS TO BE A BAG
@@ -159,5 +170,10 @@
         return r;
 
     }
+    
+    @Override
+    public boolean[] getKeyPositionsInTuple() {
+        return keyPositions.clone();
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Tue Oct 20 20:42:37 2009
@@ -23,7 +23,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -86,6 +85,15 @@
      * to "unwrap" the tuple to get to the key
      */
     private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+    
+    /**
+     * The list tracks the field position of the key in the input tuple so that
+     * the right values are "unwrapped" to get the key. 
+     * The tuples emitted from POCombinerPackages always have keys in a fixed 
+     * position, but this position varies depending on the Pig Latin scripts.
+     */
+    private ArrayList<boolean[]> keyPositions = new ArrayList<boolean[]>();
+    
     /*
      * Flag indicating when a new pull should start 
      */
@@ -215,13 +223,14 @@
      * 
      * @param inPlan plan to be appended to the inner plan list
      */
-    public void addPlan(PhysicalPlan inPlan, byte mapKeyType) {  
+    public void addPlan(PhysicalPlan inPlan, byte mapKeyType, boolean[] keyPos) {  
         myPlans.add(inPlan);
         processedSet.set(myPlans.size()-1);
         // if mapKeyType is already a tuple, we will NOT
         // be wrapping it in an extra tuple. If it is not
         // a tuple, we will wrap into in a tuple
         isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+        keyPositions.add(keyPos);
     }
    
     @Override
@@ -339,24 +348,37 @@
     
     private PhysicalOperator attachInputWithIndex(Tuple res) throws ExecException {
         
-        // unwrap the key to get the wrapped value which
-        // is expected by the inner plans
-        PigNullableWritable key = (PigNullableWritable)res.get(0);        
+        // unwrap the first field of the tuple to get the wrapped value which
+        // is expected by the inner plans, as well as the index of the associated
+        // inner plan.
+        PigNullableWritable fld = (PigNullableWritable)res.get(0);        
     
         // choose an inner plan to run based on the index set by
         // the POLocalRearrange operator and passed to this operator
         // by POMultiQueryPackage
-        int index = key.getIndex();
+        int index = fld.getIndex();
         index &= idxPart;
         index -= baseIndex;                         
         
         PhysicalPlan pl = myPlans.get(index);
         if (!(pl.getRoots().get(0) instanceof PODemux)) {                             
             if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {                                       
-                Tuple tup = (Tuple)key.getValueAsPigType();
-                res.set(0, tup.get(0));
+                
+                // unwrap the keys
+                boolean[] keys = keyPositions.get(index);
+                for (int pos = 0; pos < keys.length; pos++) {
+                    if (keys[pos]) {
+                        Tuple tup = (pos == 0) ? 
+                                (Tuple)fld.getValueAsPigType() : (Tuple)res.get(pos);
+                        res.set(pos, tup.get(0));
+                    } 
+                    else if (pos == 0) {                 
+                        res.set(0, fld.getValueAsPigType());
+                    }
+                }
+                
             } else {
-                res.set(0, key.getValueAsPigType());
+                res.set(0, fld.getValueAsPigType());
             }
         }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Tue Oct 20 20:42:37 2009
@@ -30,9 +30,11 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.NullableUnknownWritable;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.backend.hadoop.HDataType;
 
 /**
  * The package operator that packages the globally rearranged tuples 
@@ -168,7 +170,9 @@
     @Override
     public Result getNext(Tuple t) throws ExecException {
         
-        int index = myKey.getIndex();
+        byte origIndex = myKey.getIndex();
+
+        int index = (int)origIndex;
         index &= idxPart;
         index -= baseIndex;
         
@@ -187,18 +191,33 @@
         
         Tuple tuple = (Tuple)res.result;
 
-        // the key present in the first field
-        // of the tuple above is the real key without
+        // the object present in the first field
+        // of the tuple above is the real data without
         // index information - this is because the
-        // package above, extracts the real key out of
-        // the PigNullableWritable key - we are going to
+        // package above, extracts the real data out of
+        // the PigNullableWritable object - we are going to
         // give this result tuple to a PODemux operator
-        // which needs a PigNullableWritable key so
-        // it can figure out the index - we already have
-        // the PigNullableWritable key cachec in "myKey"
-        // let's send this in the result tuple
-        tuple.set(0, myKey);
-
+        // which needs a PigNullableWritable first field so
+        // it can figure out the index. Therefore we need
+        // to add index to the first field of the tuple.
+                
+        Object obj = tuple.get(0);
+        if (obj instanceof PigNullableWritable) {
+            ((PigNullableWritable)obj).setIndex(origIndex);
+        }
+        else {
+            PigNullableWritable myObj = null;
+            if (obj == null) {
+                myObj = new NullableUnknownWritable();
+                myObj.setNull(true);
+            }
+            else {
+                myObj = HDataType.getWritableComparableTypes(obj, (byte)0);
+            }
+            myObj.setIndex(origIndex);
+            tuple.set(0, myObj);
+        }
+        
         return res;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Oct 20 20:42:37 2009
@@ -60,7 +60,14 @@
      * 
      */
     private static final long serialVersionUID = 1L;
+    
+    private static boolean[] SIMPLE_KEY_POSITION; 
 
+    static {
+        SIMPLE_KEY_POSITION = new boolean[1];
+        SIMPLE_KEY_POSITION[0] = true;
+    }
+    
     //The iterator of indexed Tuples
     //that is typically provided by
     //Hadoop
@@ -328,6 +335,17 @@
     }
 
     /**
+     * Get the field positions of key in the output tuples.
+     * For POPackage, the position is always 0. The POCombinerPackage,
+     * however, can return different values.
+     * 
+     * @return the field position of key in the output tuples.
+     */
+    public boolean[] getKeyPositionsInTuple() {
+        return SIMPLE_KEY_POSITION.clone();
+    }
+    
+    /**
      * Make a deep copy of this operator.  
      * @throws CloneNotSupportedException
      */

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java?rev=827786&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java Tue Oct 20 20:42:37 2009
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class can be used when data type is 'Unknown' and 
+ * there is a need for PigNullableWritable object. 
+ */
+public class NullableUnknownWritable extends PigNullableWritable {
+
+    public NullableUnknownWritable() {
+    }
+    
+    public NullableUnknownWritable(WritableComparable<?> u) {
+        mValue = u;
+    }
+
+    @Override
+    public Object getValueAsPigType() {
+        return isNull() ? null : mValue;
+    }
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Oct 20 20:42:37 2009
@@ -18,7 +18,10 @@
 package org.apache.pig.test;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -42,6 +45,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -56,6 +60,7 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.jobcontrol.Job;
 
 public class TestMultiQuery extends TestCase {
 
@@ -107,8 +112,204 @@
             Assert.fail();
         } 
     }         
+
+    @Test
+    public void testMultiQueryJiraPig976() {
+
+        // test case: key ('group') isn't part of foreach output
+        // and keys have the same type.
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = group a by uid;");
+            myPig.registerQuery("c = group a by gid;");
+            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryJiraPig976_2() {
+
+        // test case: key ('group') isn't part of foreach output 
+        // and keys have different types
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = group a by uname;");
+            myPig.registerQuery("c = group a by gid;");
+            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryJiraPig976_3() {
+
+        // test case: group all and key ('group') isn't part of output
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = group a all;");
+            myPig.registerQuery("c = group a by gid;");
+            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryJiraPig976_4() {
+
+        // test case: group by multi-cols and key ('group') isn't part of output
+         
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = group a by uid;");
+            myPig.registerQuery("c = group a by (uname, gid);");
+            myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+            myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+   
+    @Test
+    public void testMultiQueryJiraPig976_5() {
+
+        // test case: key ('group') in multiple positions.
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = group a by uid;");
+            myPig.registerQuery("c = group a by (uname, gid);");
+            myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group;");
+            myPig.registerQuery("d1 = foreach d generate $1 + $2;");
+            myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+            myPig.registerQuery("store d1 into '/tmp/output1';");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
     
     @Test
+    public void testMultiQueryJiraPig976_6() {
+
+        // test case: key ('group') has null values.
+
+        String INPUT_FILE = "pig-976.txt";
+        
+        try {
+            
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\tapple\t100\t10");
+            w.println("apple\tapple\t\t20");
+            w.println("orange\torange\t100\t10");
+            w.println("orange\torange\t\t20");
+            w.println("strawberry\tstrawberry\t300\t10");
+   
+            w.close();
+            
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load '" + INPUT_FILE +
+                                "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = group a by uid;");
+            myPig.registerQuery("c = group a by gid;");
+            myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
+            myPig.registerQuery("e = foreach c generate COUNT(a), group;");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertTrue(jobs.size() == 2);
+            
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+            
+        }
+    }    
+
+    @Test
     public void testMultiQueryWithTwoStores2() {
 
         System.out.println("===== multi-query with 2 stores (2) =====");