You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/20 22:42:38 UTC
svn commit: r827786 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/impl/io/ test/org/apache/pig/test/
Author: pradeepkth
Date: Tue Oct 20 20:42:37 2009
New Revision: 827786
URL: http://svn.apache.org/viewvc?rev=827786&view=rev
Log:
PIG-976: Multi-query optimization throws ClassCastException (rding via pradeepkth)
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Oct 20 20:42:37 2009
@@ -63,6 +63,9 @@
BUG FIXES
+PIG-976: Multi-query optimization throws ClassCastException (rding via
+pradeepkth)
+
PIG-858: Order By followed by "replicated" join fails while compiling MR-plan
from physical plan (ashutoshc via gates)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Oct 20 20:42:37 2009
@@ -106,6 +106,11 @@
DISTINCT };
private int mKeyField = -1;
+
+ // This array tracks the positions of the group key in the output tuples
+ // of the foreach clause. This needs to be revisited when combiner optimizer
+ // supports foreach output with parts of group key (e.g. group.$0).
+ private boolean[] keyFieldPositions;
private byte mKeyType = 0;
@@ -247,7 +252,7 @@
// as it needs to act differently than the regular
// package operator.
POCombinerPackage combinePack =
- new POCombinerPackage(pack, bags);
+ new POCombinerPackage(pack, bags, keyFieldPositions);
mr.combinePlan.add(combinePack);
mr.combinePlan.add(cfe);
mr.combinePlan.connect(combinePack, cfe);
@@ -282,7 +287,7 @@
// be the POCombiner package, as it needs to act
// differently than the regular package operator.
POCombinerPackage newReducePack =
- new POCombinerPackage(pack, bags);
+ new POCombinerPackage(pack, bags, keyFieldPositions);
mr.reducePlan.replace(pack, newReducePack);
// the replace() above only changes
@@ -360,6 +365,7 @@
List<ExprType> types = new ArrayList<ExprType>(plans.size());
boolean atLeastOneAlgebraic = false;
boolean noNonAlgebraics = true;
+ keyFieldPositions = new boolean[plans.size()];
for (int i = 0; i < plans.size(); i++) {
ExprType t = algebraic(plans.get(i), flattens.get(i), i);
types.add(t);
@@ -412,6 +418,7 @@
if (cols != null && cols.size() == 1 && cols.get(0) == 0 &&
pp.getPredecessors(proj) == null) {
mKeyField = field;
+ keyFieldPositions[field] = true;
mKeyType = proj.getResultType();
} else {
// It can't be a flatten except on the grouping column
@@ -525,6 +532,8 @@
addKeyProject(mfe);
addKeyProject(cfe);
mKeyField = cPlans.size() - 1;
+ keyFieldPositions = new boolean[cPlans.size()];
+ keyFieldPositions[mKeyField] = true;
}
// Change the plans on the reduce/combine foreach to project from the column
@@ -925,5 +934,6 @@
private void resetState() {
mKeyField = -1;
mKeyType = 0;
+ keyFieldPositions = null;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Tue Oct 20 20:42:37 2009
@@ -600,9 +600,11 @@
pkg.addPackage(pk);
}
+ boolean[] keyPos = pk.getKeyPositionsInTuple();
+
PODemux demux = (PODemux)to.getLeaves().get(0);
for (int i=initial; i<current; i++) {
- demux.addPlan(from, mapKeyType);
+ demux.addPlan(from, mapKeyType, keyPos);
}
if (demux.isSameMapKeyType()) {
@@ -752,11 +754,13 @@
pkg.setKeyType(cpk.getKeyType());
+ boolean[] keyPos = cpk.getKeyPositionsInTuple();
+
// See comment above for why we replicated the Package
// in the from plan - for the same reason, we replicate
// the Demux operators now.
for (int i=initial; i<current; i++) {
- demux.addPlan(from, mapKeyType);
+ demux.addPlan(from, mapKeyType, keyPos);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Tue Oct 20 20:42:37 2009
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -58,9 +59,12 @@
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
private boolean[] mBags; // For each field, indicates whether or not it
-
- private Map<Integer, Integer> keyLookup;
// needs to be put in a bag.
+
+ private boolean[] keyPositions;
+
+ private Map<Integer, Integer> keyLookup;
+
/**
* A new POPostCombinePackage will be constructed as a near clone of the
@@ -68,8 +72,10 @@
* @param pkg POPackage to clone.
* @param bags for each field, indicates whether it should be a bag (true)
* or a simple field (false).
+ * @param keyPos for each field in the output tuple of the foreach operator,
+ * indicates whether it's the group key.
*/
- public POCombinerPackage(POPackage pkg, boolean[] bags) {
+ public POCombinerPackage(POPackage pkg, boolean[] bags, boolean[] keyPos) {
super(new OperatorKey(pkg.getOperatorKey().scope,
NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
pkg.getRequestedParallelism(), pkg.getInputs());
@@ -80,7 +86,12 @@
for (int i = 0; i < pkg.inner.length; i++) {
inner[i] = true;
}
- mBags = bags;
+ if (bags != null) {
+ mBags = Arrays.copyOf(bags, bags.length);
+ }
+ if (keyPos != null) {
+ keyPositions = Arrays.copyOf(keyPos, keyPos.length);
+ }
}
@Override
@@ -129,7 +140,7 @@
// the value (tup) that we have currently
for(int i = 0; i < mBags.length; i++) {
Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
+ if(keyIndex == null && mBags[i]) {
// the field for this index is not the
// key - so just take it from the "value"
// we were handed - Currently THIS HAS TO BE A BAG
@@ -159,5 +170,10 @@
return r;
}
+
+ @Override
+ public boolean[] getKeyPositionsInTuple() {
+ return keyPositions.clone();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Tue Oct 20 20:42:37 2009
@@ -23,7 +23,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -86,6 +85,15 @@
* to "unwrap" the tuple to get to the key
*/
private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+
+ /**
+ * The list tracks the field position of the key in the input tuple so that
+ * the right values are "unwrapped" to get the key.
+ * The tuples emitted from POCombinerPackages always have keys in a fixed
+ * position, but this position varies depending on the Pig Latin scripts.
+ */
+ private ArrayList<boolean[]> keyPositions = new ArrayList<boolean[]>();
+
/*
* Flag indicating when a new pull should start
*/
@@ -215,13 +223,14 @@
*
* @param inPlan plan to be appended to the inner plan list
*/
- public void addPlan(PhysicalPlan inPlan, byte mapKeyType) {
+ public void addPlan(PhysicalPlan inPlan, byte mapKeyType, boolean[] keyPos) {
myPlans.add(inPlan);
processedSet.set(myPlans.size()-1);
// if mapKeyType is already a tuple, we will NOT
// be wrapping it in an extra tuple. If it is not
// a tuple, we will wrap into in a tuple
isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+ keyPositions.add(keyPos);
}
@Override
@@ -339,24 +348,37 @@
private PhysicalOperator attachInputWithIndex(Tuple res) throws ExecException {
- // unwrap the key to get the wrapped value which
- // is expected by the inner plans
- PigNullableWritable key = (PigNullableWritable)res.get(0);
+ // unwrap the first field of the tuple to get the wrapped value which
+ // is expected by the inner plans, as well as the index of the associated
+ // inner plan.
+ PigNullableWritable fld = (PigNullableWritable)res.get(0);
// choose an inner plan to run based on the index set by
// the POLocalRearrange operator and passed to this operator
// by POMultiQueryPackage
- int index = key.getIndex();
+ int index = fld.getIndex();
index &= idxPart;
index -= baseIndex;
PhysicalPlan pl = myPlans.get(index);
if (!(pl.getRoots().get(0) instanceof PODemux)) {
if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
- Tuple tup = (Tuple)key.getValueAsPigType();
- res.set(0, tup.get(0));
+
+ // unwrap the keys
+ boolean[] keys = keyPositions.get(index);
+ for (int pos = 0; pos < keys.length; pos++) {
+ if (keys[pos]) {
+ Tuple tup = (pos == 0) ?
+ (Tuple)fld.getValueAsPigType() : (Tuple)res.get(pos);
+ res.set(pos, tup.get(0));
+ }
+ else if (pos == 0) {
+ res.set(0, fld.getValueAsPigType());
+ }
+ }
+
} else {
- res.set(0, key.getValueAsPigType());
+ res.set(0, fld.getValueAsPigType());
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Tue Oct 20 20:42:37 2009
@@ -30,9 +30,11 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.NullableUnknownWritable;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.backend.hadoop.HDataType;
/**
* The package operator that packages the globally rearranged tuples
@@ -168,7 +170,9 @@
@Override
public Result getNext(Tuple t) throws ExecException {
- int index = myKey.getIndex();
+ byte origIndex = myKey.getIndex();
+
+ int index = (int)origIndex;
index &= idxPart;
index -= baseIndex;
@@ -187,18 +191,33 @@
Tuple tuple = (Tuple)res.result;
- // the key present in the first field
- // of the tuple above is the real key without
+ // the object present in the first field
+ // of the tuple above is the real data without
// index information - this is because the
- // package above, extracts the real key out of
- // the PigNullableWritable key - we are going to
+ // package above, extracts the real data out of
+ // the PigNullableWritable object - we are going to
// give this result tuple to a PODemux operator
- // which needs a PigNullableWritable key so
- // it can figure out the index - we already have
- // the PigNullableWritable key cachec in "myKey"
- // let's send this in the result tuple
- tuple.set(0, myKey);
-
+ // which needs a PigNullableWritable first field so
+ // it can figure out the index. Therefore we need
+ // to add index to the first field of the tuple.
+
+ Object obj = tuple.get(0);
+ if (obj instanceof PigNullableWritable) {
+ ((PigNullableWritable)obj).setIndex(origIndex);
+ }
+ else {
+ PigNullableWritable myObj = null;
+ if (obj == null) {
+ myObj = new NullableUnknownWritable();
+ myObj.setNull(true);
+ }
+ else {
+ myObj = HDataType.getWritableComparableTypes(obj, (byte)0);
+ }
+ myObj.setIndex(origIndex);
+ tuple.set(0, myObj);
+ }
+
return res;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Oct 20 20:42:37 2009
@@ -60,7 +60,14 @@
*
*/
private static final long serialVersionUID = 1L;
+
+ private static boolean[] SIMPLE_KEY_POSITION;
+ static {
+ SIMPLE_KEY_POSITION = new boolean[1];
+ SIMPLE_KEY_POSITION[0] = true;
+ }
+
//The iterator of indexed Tuples
//that is typically provided by
//Hadoop
@@ -328,6 +335,17 @@
}
/**
+ * Get the field positions of key in the output tuples.
+ * For POPackage, the position is always 0. The POCombinerPackage,
+ * however, can return different values.
+ *
+ * @return the field position of key in the output tuples.
+ */
+ public boolean[] getKeyPositionsInTuple() {
+ return SIMPLE_KEY_POSITION.clone();
+ }
+
+ /**
* Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java?rev=827786&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java Tue Oct 20 20:42:37 2009
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class can be used when data type is 'Unknown' and
+ * there is a need for PigNullableWritable object.
+ */
+public class NullableUnknownWritable extends PigNullableWritable {
+
+ public NullableUnknownWritable() {
+ }
+
+ public NullableUnknownWritable(WritableComparable<?> u) {
+ mValue = u;
+ }
+
+ @Override
+ public Object getValueAsPigType() {
+ return isNull() ? null : mValue;
+ }
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=827786&r1=827785&r2=827786&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Oct 20 20:42:37 2009
@@ -18,7 +18,10 @@
package org.apache.pig.test;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,6 +45,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -56,6 +60,7 @@
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.jobcontrol.Job;
public class TestMultiQuery extends TestCase {
@@ -107,8 +112,204 @@
Assert.fail();
}
}
+
+ @Test
+ public void testMultiQueryJiraPig976() {
+
+ // test case: key ('group') isn't part of foreach output
+ // and keys have the same type.
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d into '/tmp/output1';");
+ myPig.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig976_2() {
+
+ // test case: key ('group') isn't part of foreach output
+ // and keys have different types
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uname;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d into '/tmp/output1';");
+ myPig.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig976_3() {
+
+ // test case: group all and key ('group') isn't part of output
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a all;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d into '/tmp/output1';");
+ myPig.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig976_4() {
+
+ // test case: group by multi-cols and key ('group') isn't part of output
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by (uname, gid);");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
+ myPig.registerQuery("store d into '/tmp/output1';");
+ myPig.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig976_5() {
+
+ // test case: key ('group') in multiple positions.
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by (uname, gid);");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group;");
+ myPig.registerQuery("d1 = foreach d generate $1 + $2;");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d1 into '/tmp/output1';");
+ myPig.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
@Test
+ public void testMultiQueryJiraPig976_6() {
+
+ // test case: key ('group') has null values.
+
+ String INPUT_FILE = "pig-976.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\tapple\t100\t10");
+ w.println("apple\tapple\t\t20");
+ w.println("orange\torange\t100\t10");
+ w.println("orange\torange\t\t20");
+ w.println("strawberry\tstrawberry\t300\t10");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load '" + INPUT_FILE +
+ "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate COUNT(a), group;");
+ myPig.registerQuery("store d into '/tmp/output1';");
+ myPig.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+ }
+
+ @Test
public void testMultiQueryWithTwoStores2() {
System.out.println("===== multi-query with 2 stores (2) =====");