You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ut...@apache.org on 2007/12/13 23:08:09 UTC
svn commit: r604050 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/impl/eval/ProjectSpec.java
src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
test/org/apache/pig/test/TestAlgebraicEval.java
Author: utkarsh
Date: Thu Dec 13 14:08:07 2007
New Revision: 604050
URL: http://svn.apache.org/viewvc?rev=604050&view=rev
Log:
PIG-51: Fixed combiner in the presence of flattening
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
incubator/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=604050&r1=604049&r2=604050&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Dec 13 14:08:07 2007
@@ -56,3 +56,5 @@
PIG-39: created more efficient version of read (spullara via olgan)
PIG-41: Added patterns to svn:ignore
+
+ PIG-51: Fixed combiner in the presence of flattening
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java?rev=604050&r1=604049&r2=604050&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java Thu Dec 13 14:08:07 2007
@@ -100,6 +100,8 @@
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[");
+ if (isFlattened)
+ sb.append("FLATTEN ");
sb.append("PROJECT ");
boolean first = true;
for (int i: cols){
Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=604050&r1=604049&r2=604050&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java Thu Dec 13 14:08:07 2007
@@ -400,12 +400,26 @@
private class CombineAdjuster extends EvalSpecVisitor {
private int position = 0;
+
+ //We don't want to be performing any flattening in the combiner since the column numbers in
+ //the reduce spec assume that there is no combiner. If the combiner performs flattening, the column
+ //numbers get messed up. For now, since combiner works only with generate group, func1(), func2(),...,
+ //it suffices to write visitors for those eval spec types.
public void visitFuncEval(FuncEvalSpec fe) {
// Reset the function to call the initial instance of itself
// instead of the general instance.
fe.resetFuncToInitial();
+ fe.setFlatten(false);
}
+
+
+ @Override
+ public void visitProject(ProjectSpec p) {
+ p.setFlatten(false);
+ }
+
+
}
private class CombineDeterminer extends EvalSpecVisitor {
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java?rev=604050&r1=604049&r2=604050&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java Thu Dec 13 14:08:07 2007
@@ -33,6 +33,36 @@
public class TestAlgebraicEval extends TestCase {
private String initString = "mapreduce";
+
+ @Test
+ public void testGroupCountWithMultipleFields() throws Exception {
+ int LOOP_COUNT = 1024;
+ PigServer pig = new PigServer(initString);
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println(i + "\t" + i + "\t" + j%2);
+ }
+ }
+ ps.close();
+ String query = "myid = foreach (group (load 'file:" + tmpFile + "') all) generate group, COUNT($1) ;";
+ System.out.println(query);
+ pig.registerQuery(" a = group (load 'file:" + tmpFile + "') by ($0,$1);");
+ pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);");
+ Iterator<Tuple> it = pig.openIterator("b");
+ tmpFile.delete();
+ int count = 0;
+ while(it.hasNext()){
+ int sum = it.next().getAtomField(2).numval().intValue();
+ assertEquals(LOOP_COUNT/2, sum);
+ count++;
+ }
+ assertEquals(count, LOOP_COUNT);
+ }
+
+
+
@Test
public void testSimpleCount() throws Exception {
int LOOP_COUNT = 1024;
@@ -72,6 +102,8 @@
Double count = t.getAtomField(1).numval();
assertEquals(count, (double)LOOP_COUNT);
}
+
+
@Test
public void testGroupReorderCount() throws Exception {