You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/09/07 04:26:10 UTC

svn commit: r1759542 - /pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java

Author: xuefu
Date: Wed Sep  7 04:26:10 2016
New Revision: 1759542

URL: http://svn.apache.org/viewvc?rev=1759542&view=rev
Log:
PIG-4870: Enable MergeJoin testcase in TestCollectedGroup for spark engine (Xianda via Xuefu)

Modified:
    pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java

Modified: pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java?rev=1759542&r1=1759541&r2=1759542&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java Wed Sep  7 04:26:10 2016
@@ -294,40 +294,38 @@ public class TestCollectedGroup {
 
     @Test
     public void testMapsideGroupWithMergeJoin() throws IOException{
-        if( !Util.isSparkExecType(cluster.getExecType())) {
-            pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
-            pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
-            pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
-            try {
-                DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
-                DataBag dbshj = BagFactory.getInstance().newDefaultBag();
-                {
-                    pigServer.registerQuery("C = join A by id, B by id using 'merge';");
-                    pigServer.registerQuery("D = group C by A::id using 'collected';");
-                    pigServer.registerQuery("E = foreach D generate group, COUNT(C);");
-                    Iterator<Tuple> iter = pigServer.openIterator("E");
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id, B by id using 'merge';");
+                pigServer.registerQuery("D = group C by A::id using 'collected';");
+                pigServer.registerQuery("E = foreach D generate group, COUNT(C);");
+                Iterator<Tuple> iter = pigServer.openIterator("E");
 
-                    while (iter.hasNext()) {
-                        dbfrj.add(iter.next());
-                    }
+                while (iter.hasNext()) {
+                    dbfrj.add(iter.next());
                 }
-                {
-                    pigServer.registerQuery("F = join A by id, B by id;");
-                    pigServer.registerQuery("G = group F by A::id;");
-                    pigServer.registerQuery("H = foreach G generate group, COUNT(F);");
-                    Iterator<Tuple> iter = pigServer.openIterator("H");
+            }
+            {
+                pigServer.registerQuery("F = join A by id, B by id;");
+                pigServer.registerQuery("G = group F by A::id;");
+                pigServer.registerQuery("H = foreach G generate group, COUNT(F);");
+                Iterator<Tuple> iter = pigServer.openIterator("H");
 
-                    while (iter.hasNext()) {
-                        dbshj.add(iter.next());
-                    }
+                while (iter.hasNext()) {
+                    dbshj.add(iter.next());
                 }
-                Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0);
-                Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
-
-            } catch (Exception e) {
-                e.printStackTrace();
-                Assert.fail(e.getMessage());
             }
+            Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0);
+            Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
         }
     }