You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/25 00:55:24 UTC
svn commit: r707776 - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physi...
Author: olga
Date: Fri Oct 24 15:55:24 2008
New Revision: 707776
URL: http://svn.apache.org/viewvc?rev=707776&view=rev
Log:
PIG-508: problems with double joins
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Oct 24 15:55:24 2008
@@ -301,3 +301,5 @@
PIG-499: parser issue with as (sms via olgan)
PIG-507: permission error not reported (pradeepk via olgan)
+
+ PIG-508: problem with double joins (pradeepk via olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Oct 24 15:55:24 2008
@@ -809,7 +809,8 @@
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
pkg.setKeyType(DataType.TUPLE);
- pkg.setNumInps(0);
+ pkg.setDistinct(true);
+ pkg.setNumInps(1);
boolean[] inner = {false};
pkg.setInner(inner);
curMROp.reducePlan.add(pkg);
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri Oct 24 15:55:24 2008
@@ -58,7 +58,6 @@
// OR in the reduce plan. POPostCombinerPackage could
// be present only in the reduce plan. Search in these two
// plans accordingly
-
if(!mr.combinePlan.isEmpty()) {
PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan);
pkgDiscoverer.visit();
@@ -76,7 +75,7 @@
// if the POPackage is actually a POPostCombinerPackage, then we should
// just look for the corresponding LocalRearrange(s) in the combine plan
if(pkg instanceof POPostCombinerPackage) {
- if(!patchPackage(mr.combinePlan, pkg)) {
+ if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
throw new VisitorException("Unexpected problem while trying " +
"to optimize (could not find LORearrange in combine plan)");
}
@@ -91,26 +90,31 @@
private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
// the LocalRearrange(s) could either be in the map of this MapReduceOper
// OR in the reduce of predecessor MapReduceOpers
- if(!patchPackage(mr.mapPlan, pkg)) {
+ int lrFound = 0;
+
+ lrFound = patchPackage(mr.mapPlan, pkg);
+ if(lrFound != pkg.getNumInps()) {
// we did not find the LocalRearrange(s) in the map plan
// let's look in the predecessors
List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) {
MapReduceOper mrOper = it.next();
- if(!patchPackage(mrOper.reducePlan, pkg)) {
- throw new VisitorException("Unexpected problem while trying " +
- "to optimize (could not find LORearrange in predecessor's reduce plan)");
+ lrFound += patchPackage(mrOper.reducePlan, pkg);
+ if(lrFound == pkg.getNumInps()) {
+ break;
}
}
}
+ if(lrFound != pkg.getNumInps())
+ throw new VisitorException("Unexpected problem while trying to optimize (Could not find all LocalRearranges)");
}
- private boolean patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
+ private int patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg);
lrDiscoverer.visit();
// let our caller know if we managed to patch
// the package
- return lrDiscoverer.isLoRearrangeFound();
+ return lrDiscoverer.getLoRearrangeFound();
}
/**
@@ -161,7 +165,7 @@
*/
class LoRearrangeDiscoverer extends PhyPlanVisitor {
- private boolean loRearrangeFound = false;
+ private int loRearrangeFound = 0;
private POPackage pkg;
public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
@@ -174,14 +178,22 @@
*/
@Override
public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
- loRearrangeFound = true;
+ loRearrangeFound++;
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
keyInfo = pkg.getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+ if(keyInfo.get(lrearrange.getIndex()) != null) {
+ // something is wrong - we should not be getting key info
+ // for the same index from two different Local Rearranges
+ throw new VisitorException("Unexpected problem while trying " +
+ "to optimize (found same index:" + lrearrange.getIndex() +
+ " in multiple Local Rearrange operators");
+ }
keyInfo.put(new Integer(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
@@ -192,7 +204,7 @@
/**
* @return the loRearrangeFound
*/
- public boolean isLoRearrangeFound() {
+ public int getLoRearrangeFound() {
return loRearrangeFound;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Oct 24 15:55:24 2008
@@ -84,6 +84,10 @@
//on a particular input
boolean[] inner;
+ // flag to denote whether there is a distinct
+ // leading to this package
+ protected boolean distinct = false;
+
// A mapping of input index to key information got from LORearrange
// for that index. The Key information is a pair of boolean, Map.
// The boolean indicates whether there is a lone project(*) in the
@@ -182,97 +186,36 @@
*/
@Override
public Result getNext(Tuple t) throws ExecException {
- //Create numInputs bags
- DataBag[] dbs = null;
- if (numInputs > 0) {
+ Tuple res;
+ if(distinct) {
+ // only set the key which has the whole
+ // tuple
+ res = mTupleFactory.newTuple(1);
+ res.set(0, key);
+ } else {
+ //Create numInputs bags
+ DataBag[] dbs = null;
dbs = new DataBag[numInputs];
for (int i = 0; i < numInputs; i++) {
dbs[i] = mBagFactory.newDefaultBag();
}
- }
-
- //For each indexed tup in the inp, sort them
- //into their corresponding bags based
- //on the index
- while (tupIter.hasNext()) {
- NullableTuple ntup = tupIter.next();
- // Need to make a copy of the value, as hadoop uses the same ntup
- // to represent each value.
- Tuple val = (Tuple)ntup.getValueAsPigType();
- /*
- Tuple copy = mTupleFactory.newTuple(val.size());
- for (int i = 0; i < val.size(); i++) {
- copy.set(i, val.get(i));
- }
- */
- Tuple copy = null;
- // The "value (val)" that we just got may not
- // be the complete "value". It may have some portions
- // in the "key" (look in POLocalRearrange for more comments)
- // If this is the case we need to stitch
- // the "value" together.
- int index = ntup.getIndex();
- Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(index);
- boolean isProjectStar = lrKeyInfo.first;
- Map<Integer, Integer> keyLookup = lrKeyInfo.second;
- int keyLookupSize = keyLookup.size();
-
- if( keyLookupSize > 0) {
-
- // we have some fields of the "value" in the
- // "key".
- copy = mTupleFactory.newTuple();
- int finalValueSize = keyLookupSize + val.size();
- int valIndex = 0; // an index for accessing elements from
- // the value (val) that we have currently
- for(int i = 0; i < finalValueSize; i++) {
- Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
- // the field for this index is not in the
- // key - so just take it from the "value"
- // we were handed
- copy.append(val.get(valIndex));
- valIndex++;
- } else {
- // the field for this index is in the key
- if(isKeyTuple) {
- // the key is a tuple, extract the
- // field out of the tuple
- copy.append(keyAsTuple.get(keyIndex));
- } else {
- copy.append(key);
- }
- }
- }
-
- } else if (isProjectStar) {
-
- log.info("In project star, keyAsTuple:" + keyAsTuple);
- // the whole "value" is present in the "key"
- copy = mTupleFactory.newTuple(keyAsTuple.getAll());
-
- } else {
-
- // there is no field of the "value" in the
- // "key" - so just make a copy of what we got
- // as the "value"
- copy = mTupleFactory.newTuple(val.getAll());
-
+ //For each indexed tup in the inp, sort them
+ //into their corresponding bags based
+ //on the index
+ while (tupIter.hasNext()) {
+ NullableTuple ntup = tupIter.next();
+ int index = ntup.getIndex();
+ Tuple copy = getValueTuple(ntup, index);
+ dbs[index].add(copy);
+ if(reporter!=null) reporter.progress();
}
- if (numInputs > 0) dbs[index].add(copy);
- if(reporter!=null) reporter.progress();
- }
-
- //Construct the output tuple by appending
- //the key and all the above constructed bags
- //and return it.
- Tuple res;
- res = mTupleFactory.newTuple(numInputs+1);
- res.set(0,key);
- if (numInputs > 0) {
+ //Construct the output tuple by appending
+ //the key and all the above constructed bags
+ //and return it.
+ res = mTupleFactory.newTuple(numInputs+1);
+ res.set(0,key);
int i=-1;
for (DataBag bag : dbs) {
if(inner[++i]){
@@ -293,6 +236,73 @@
return r;
}
+ protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException {
+ // Need to make a copy of the value, as hadoop uses the same ntup
+ // to represent each value.
+ Tuple val = (Tuple)ntup.getValueAsPigType();
+ /*
+ Tuple copy = mTupleFactory.newTuple(val.size());
+ for (int i = 0; i < val.size(); i++) {
+ copy.set(i, val.get(i));
+ }
+ */
+
+ Tuple copy = null;
+ // The "value (val)" that we just got may not
+ // be the complete "value". It may have some portions
+ // in the "key" (look in POLocalRearrange for more comments)
+ // If this is the case we need to stitch
+ // the "value" together.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(index);
+ boolean isProjectStar = lrKeyInfo.first;
+ Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+ int keyLookupSize = keyLookup.size();
+
+ if( keyLookupSize > 0) {
+
+ // we have some fields of the "value" in the
+ // "key".
+ copy = mTupleFactory.newTuple();
+ int finalValueSize = keyLookupSize + val.size();
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for(int i = 0; i < finalValueSize; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if(keyIndex == null) {
+ // the field for this index is not in the
+ // key - so just take it from the "value"
+ // we were handed
+ copy.append(val.get(valIndex));
+ valIndex++;
+ } else {
+ // the field for this index is in the key
+ if(isKeyTuple) {
+ // the key is a tuple, extract the
+ // field out of the tuple
+ copy.append(keyAsTuple.get(keyIndex));
+ } else {
+ copy.append(key);
+ }
+ }
+ }
+
+ } else if (isProjectStar) {
+
+ // the whole "value" is present in the "key"
+ copy = mTupleFactory.newTuple(keyAsTuple.getAll());
+
+ } else {
+
+ // there is no field of the "value" in the
+ // "key" - so just make a copy of what we got
+ // as the "value"
+ copy = mTupleFactory.newTuple(val.getAll());
+
+ }
+ return copy;
+ }
+
public byte getKeyType() {
return keyType;
}
@@ -340,5 +350,19 @@
return keyInfo;
}
+ /**
+ * @return the distinct
+ */
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ /**
+ * @param distinct the distinct to set
+ */
+ public void setDistinct(boolean distinct) {
+ this.distinct = distinct;
+ }
+
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Fri Oct 24 15:55:24 2008
@@ -49,6 +49,7 @@
import org.apache.pig.impl.io.PigFile;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
import junit.framework.TestCase;
@@ -659,4 +660,37 @@
assertEquals(1, t.get(2));
assertEquals(Integer.class, t.get(2).getClass());
}
+
+ @Test
+ public void testCogroupWithInputFromGroup() throws IOException, ExecException {
+ // Create input file with ascii data
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
+ "pigtester2\t10\t1.2",
+ "pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});
+
+ Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
+ // we will in essence be doing a group on first column and getting
+ // SUM over second column and a count for the group - store
+ // the results for the three groups above so we can check the output
+ resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
+ resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
+ resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));
+
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+ "as (name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a by name;");
+ pigServer.registerQuery("c = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+ "as (name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("d = cogroup b by group, c by name;");
+ pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
+ Iterator<Tuple> it = pigServer.openIterator("e");
+ for(int i = 0; i < resultMap.size(); i++) {
+ Tuple t = it.next();
+ assertEquals(true, resultMap.containsKey(t.get(0)));
+ Pair<Long, Long> output = resultMap.get(t.get(0));
+ assertEquals(output.first, t.get(1));
+ assertEquals(output.second, t.get(2));
+ }
+ }
}