You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/09/07 04:26:10 UTC
svn commit: r1759542 -
/pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
Author: xuefu
Date: Wed Sep 7 04:26:10 2016
New Revision: 1759542
URL: http://svn.apache.org/viewvc?rev=1759542&view=rev
Log:
PIG-4870: Enable MergeJoin testcase in TestCollectedGroup for spark engine (Xianda via Xuefu)
Modified:
pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
Modified: pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java?rev=1759542&r1=1759541&r2=1759542&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java Wed Sep 7 04:26:10 2016
@@ -294,40 +294,38 @@ public class TestCollectedGroup {
@Test
public void testMapsideGroupWithMergeJoin() throws IOException{
- if( !Util.isSparkExecType(cluster.getExecType())) {
- pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
- pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
- pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
- try {
- DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
- DataBag dbshj = BagFactory.getInstance().newDefaultBag();
- {
- pigServer.registerQuery("C = join A by id, B by id using 'merge';");
- pigServer.registerQuery("D = group C by A::id using 'collected';");
- pigServer.registerQuery("E = foreach D generate group, COUNT(C);");
- Iterator<Tuple> iter = pigServer.openIterator("E");
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id, B by id using 'merge';");
+ pigServer.registerQuery("D = group C by A::id using 'collected';");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(C);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
- while (iter.hasNext()) {
- dbfrj.add(iter.next());
- }
+ while (iter.hasNext()) {
+ dbfrj.add(iter.next());
}
- {
- pigServer.registerQuery("F = join A by id, B by id;");
- pigServer.registerQuery("G = group F by A::id;");
- pigServer.registerQuery("H = foreach G generate group, COUNT(F);");
- Iterator<Tuple> iter = pigServer.openIterator("H");
+ }
+ {
+ pigServer.registerQuery("F = join A by id, B by id;");
+ pigServer.registerQuery("G = group F by A::id;");
+ pigServer.registerQuery("H = foreach G generate group, COUNT(F);");
+ Iterator<Tuple> iter = pigServer.openIterator("H");
- while (iter.hasNext()) {
- dbshj.add(iter.next());
- }
+ while (iter.hasNext()) {
+ dbshj.add(iter.next());
}
- Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0);
- Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
}
+ Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
}
}