You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/11/02 19:32:30 UTC

svn commit: r1196747 [2/2] - in /pig/trunk: ./ ivy/ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/pig/backend/hadoo...

Modified: pig/trunk/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSecondarySort.java?rev=1196747&r1=1196746&r2=1196747&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSecondarySort.java Wed Nov  2 18:32:28 2011
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
@@ -462,16 +463,20 @@ public class TestSecondarySort {
         ps1.println("1\t2\t4");
         ps1.println("2\t3\t4");
         ps1.close();
+        
+        String expected[] = {
+                "(2,{(2,3,4)})",
+                "(1,{(1,2,3),(1,2,4),(1,2,4),(1,2,4),(1,3,4)})"
+        };
+        
         Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), tmpFile1.getCanonicalPath());
         pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = group A by $0 parallel 2;");
         pigServer.registerQuery("C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};");
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        assertTrue(iter.hasNext());
-        assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
-        assertTrue(iter.hasNext());
-        assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,2,4),(1,2,4),(1,3,4)})"));
-        assertFalse(iter.hasNext());
+        Schema s = pigServer.dumpSchema("C");
+        
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(s));
         Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
     }
 
@@ -486,16 +491,20 @@ public class TestSecondarySort {
         ps1.println("1\t8\t4");
         ps1.println("2\t3\t4");
         ps1.close();
+        
+        String expected[] = {
+                "(2,{(2,3,4)})",
+                "(1,{(1,8,4),(1,4,4),(1,3,4),(1,2,3),(1,2,4)})"
+        };
+        
         Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), tmpFile1.getCanonicalPath());
         pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = group A by $0 parallel 2;");
         pigServer.registerQuery("C = foreach B { D = order A by a1 desc; generate group, D;};");
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        assertTrue(iter.hasNext());
-        assertEquals("(2,{(2,3,4)})", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("(1,{(1,8,4),(1,4,4),(1,3,4),(1,2,3),(1,2,4)})", iter.next().toString());
-        assertFalse(iter.hasNext());
+        Schema s = pigServer.dumpSchema("C");
+        
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(s));
         Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
     }
 
@@ -571,18 +580,21 @@ public class TestSecondarySort {
         ps1.println("1\t2\t4");
         ps1.println("2\t3\t4");
         ps1.close();
+        
+        String expected[] = {
+                "((1,2),{(2,3),(2,4),(2,4),(2,4)})",
+                "((1,3),{(3,4)})",
+                "((2,3),{(3,4)})"
+        };
         Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), tmpFile1.getCanonicalPath());
         pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
         pigServer.registerQuery("B = group A by (a0, a1);");
         pigServer.registerQuery("C = foreach B { C1 = A.(a1,a2); generate group, C1;};");
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        assertTrue(iter.hasNext());
-        assertEquals("((1,2),{(2,3),(2,4),(2,4),(2,4)})", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("((1,3),{(3,4)})", iter.next().toString());
-        assertTrue(iter.hasNext());
-        assertEquals("((2,3),{(3,4)})", iter.next().toString());
-        assertFalse(iter.hasNext());
+        Schema s = pigServer.dumpSchema("C");
+        
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        
         Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
     }
 }
\ No newline at end of file

Modified: pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=1196747&r1=1196746&r2=1196747&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreaming.java Wed Nov  2 18:32:28 2011
@@ -214,7 +214,7 @@ public class TestStreaming {
             }
     		
     		// Run the query and check the results
-    		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+    		Util.checkQueryOutputsAfterSort(pigServer.openIterator("OP"), expectedResults);
         }
 	}
 

Modified: pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=1196747&r1=1196746&r2=1196747&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Wed Nov  2 18:32:28 2011
@@ -197,7 +197,7 @@ public class TestStreamingLocal extends 
             }
 
             // Run the query and check the results
-            Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+            Util.checkQueryOutputsAfterSort(pigServer.openIterator("OP"), expectedResults);
         }
     }
 

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1196747&r1=1196746&r2=1196747&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Wed Nov  2 18:32:28 2011
@@ -58,8 +58,10 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.pig.ExecType;
+import org.apache.pig.LoadCaster;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
@@ -69,10 +71,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.SortedDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -88,6 +93,8 @@ import org.apache.pig.newplan.logical.op
 import org.apache.pig.newplan.logical.relational.LOStore;
 import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 import org.apache.pig.newplan.logical.optimizer.UidResetter;
 import org.apache.pig.newplan.logical.rules.LoadStoreFuncDupSignatureValidator;
 import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
@@ -1042,7 +1049,86 @@ public class Util {
         }
     }
 
-  
+    
+    static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt, 
+            Tuple[] expectedResArray) {
+        List<Tuple> list = new ArrayList<Tuple>();
+        Collections.addAll(list, expectedResArray);
+        checkQueryOutputsAfterSort(actualResultsIt, list);
+    }
+     
+    
+    static private void convertBagToSortedBag(Tuple t) {
+        for (int i=0;i<t.size();i++) {
+           Object obj = null;
+           try {
+               obj = t.get(i);
+           } catch (ExecException e) {
+               // shall not happen
+           }
+           if (obj instanceof DataBag) {
+                DataBag bag = (DataBag)obj;
+                Iterator<Tuple> iter = bag.iterator();
+                DataBag sortedBag = DefaultBagFactory.getInstance().newSortedBag(null);
+                while (iter.hasNext()) {
+                    Tuple t2 = iter.next();
+                    sortedBag.add(t2);
+                    convertBagToSortedBag(t2);
+                }
+                try {
+                    t.set(i, sortedBag);
+                } catch (ExecException e) {
+                    // shall not happen
+                }
+           }
+        }
+    }
+    
+    static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt, 
+            String[] expectedResArray, String schemaString) throws IOException {
+        LogicalSchema resultSchema = org.apache.pig.impl.util.Utils.parseSchema(schemaString);
+        checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, resultSchema);
+    }
+          /**
+     * Helper function to check if the result of a Pig Query is in line with 
+     * expected results. It sorts actual and expected string results before comparison
+     * 
+     * @param actualResultsIt Result of the executed Pig query
+     * @param expectedResArray Expected string results to validate against
+     * @param fs fieldSchema of expecteResArray
+     * @throws IOException 
+     */
+    static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt, 
+            String[] expectedResArray, LogicalSchema schema) throws IOException {
+        LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE);
+        ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
+        
+        LoadCaster caster = new Utf8StorageConverter();
+        List<Tuple> actualResList = new ArrayList<Tuple>();
+        while(actualResultsIt.hasNext()){
+            actualResList.add(actualResultsIt.next());
+        }
+        
+        List<Tuple> expectedResList = new ArrayList<Tuple>();
+        for (String str : expectedResArray) {
+            Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
+            expectedResList.add(newTuple);
+        }
+        
+        for (Tuple t : actualResList) {
+            convertBagToSortedBag(t);
+        }
+        
+        for (Tuple t : expectedResList) {
+            convertBagToSortedBag(t);
+        }
+        
+        Collections.sort(actualResList);
+        Collections.sort(expectedResList);
+        
+        Assert.assertEquals("Comparing actual and expected results. ",
+                expectedResList, actualResList);
+    }
 
     
 }