You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/20 15:23:22 UTC

svn commit: r1680561 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java test/org/apache/pig/test/TestForEachNestedPlanLocal.java

Author: xuefu
Date: Wed May 20 13:23:21 2015
New Revision: 1680561

URL: http://svn.apache.org/r1680561
Log:
PIG-4552: Fix TestForEachNestedPlanLocal for Spark engine (Mohit via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed May 20 13:23:21 2015
@@ -458,21 +458,23 @@ public class SparkLauncher extends Launc
 			SparkPigStats sparkStats, JobConf jobConf) throws IOException,
 			InterruptedException {
 		Set<Integer> seenJobIDs = new HashSet<Integer>();
-		if (sparkPlan != null) {
-			List<SparkOperator> leaves = sparkPlan.getLeaves();
-			Collections.sort(leaves);
-			Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
-			if (LOG.isDebugEnabled())
-			    LOG.debug("Converting " + leaves.size() + " Spark Operators");
-			for (SparkOperator leaf : leaves) {
-				new PhyPlanSetter(leaf.physicalPlan).visit();
-				Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
-				sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
-						physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
-						jobConf);
-			}
-		} else {
-			throw new RuntimeException("sparkPlan is null");
+		if (sparkPlan == null) {
+			throw new RuntimeException("SparkPlan is null.");
+		}
+
+		List<SparkOperator> leaves = sparkPlan.getLeaves();
+		Collections.sort(leaves);
+		Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Converting " + leaves.size() + " Spark Operators to RDDs");
+		}
+
+		for (SparkOperator leaf : leaves) {
+			new PhyPlanSetter(leaf.physicalPlan).visit();
+			Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
+			sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+					physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
+					jobConf);
 		}
 	}
 
@@ -509,21 +511,21 @@ public class SparkLauncher extends Launc
 									+ "sparkOperator:{}",
 							sparkOperator.physicalPlan.getLeaves().size(),
 							sparkOperator.name()));
-		} else {
-			PhysicalOperator leafPO = leafPOs.get(0);
-			try {
-				physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
-						predecessorRDDs, convertMap);
-				sparkOpRdds.put(sparkOperator.getOperatorKey(),
-						physicalOpRdds.get(leafPO.getOperatorKey()));
-			} catch(Exception e) {
-				if( e instanceof  SparkException) {
-					LOG.info("throw SparkException, error founds when running " +
-							"rdds in spark");
-				}
-				exception = e;
-				isFail = true;
+		}
+
+		PhysicalOperator leafPO = leafPOs.get(0);
+		try {
+			physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+					predecessorRDDs, convertMap);
+			sparkOpRdds.put(sparkOperator.getOperatorKey(),
+					physicalOpRdds.get(leafPO.getOperatorKey()));
+		} catch(Exception e) {
+			if( e instanceof  SparkException) {
+				LOG.info("throw SparkException, error founds when running " +
+						"rdds in spark");
 			}
+			exception = e;
+			isFail = true;
 		}
 
 		List<POStore> poStores = PlanHelper.getPhysicalOperators(
@@ -541,10 +543,10 @@ public class SparkLauncher extends Launc
                         conf, exception);
             }
         } else {
-			      LOG.info(String
-					      .format(String.format("sparkOperator:{} does not have POStore or" +
-                    " sparkOperator has more than 1 POStore. {} is the size of POStore."),
-                    sparkOperator.name(), poStores.size()));
+			LOG.info(String
+					.format(String.format("sparkOperator:{} does not have POStore or" +
+									" sparkOperator has more than 1 POStore. {} is the size of POStore."),
+							sparkOperator.name(), poStores.size()));
 		}
 	}
 
@@ -557,9 +559,10 @@ public class SparkLauncher extends Launc
         RDD<Tuple> nextRDD = null;
         List<PhysicalOperator> predecessors = plan
                 .getPredecessors(physicalOperator);
-        if (predecessors != null) {
+        if (predecessors != null && predecessors.size() > 1) {
             Collections.sort(predecessors);
         }
+
         List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
 		if (predecessors != null) {
 			for (PhysicalOperator predecessor : predecessors) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Wed May 20 13:23:21 2015
@@ -100,14 +100,8 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("D = foreach C {"
                 + "crossed = cross user, session;"
                 + "generate crossed;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     @Test
@@ -130,14 +124,8 @@ public class TestForEachNestedPlanLocal
                 + "crossed = cross user, distinct_session;"
                 + "filtered = filter crossed by user::region == distinct_session::region;"
                 + "generate filtered;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     @Test
@@ -161,14 +149,8 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("D = foreach C {"
                 + "crossed = cross user, session, profile;"
                 + "generate crossed;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     /*