You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/20 15:23:22 UTC
svn commit: r1680561 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
test/org/apache/pig/test/TestForEachNestedPlanLocal.java
Author: xuefu
Date: Wed May 20 13:23:21 2015
New Revision: 1680561
URL: http://svn.apache.org/r1680561
Log:
PIG-4552: Fix TestForEachNestedPlanLocal for Spark engine (Mohit via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed May 20 13:23:21 2015
@@ -458,21 +458,23 @@ public class SparkLauncher extends Launc
SparkPigStats sparkStats, JobConf jobConf) throws IOException,
InterruptedException {
Set<Integer> seenJobIDs = new HashSet<Integer>();
- if (sparkPlan != null) {
- List<SparkOperator> leaves = sparkPlan.getLeaves();
- Collections.sort(leaves);
- Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
- if (LOG.isDebugEnabled())
- LOG.debug("Converting " + leaves.size() + " Spark Operators");
- for (SparkOperator leaf : leaves) {
- new PhyPlanSetter(leaf.physicalPlan).visit();
- Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
- sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
- physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
- jobConf);
- }
- } else {
- throw new RuntimeException("sparkPlan is null");
+ if (sparkPlan == null) {
+ throw new RuntimeException("SparkPlan is null.");
+ }
+
+ List<SparkOperator> leaves = sparkPlan.getLeaves();
+ Collections.sort(leaves);
+ Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting " + leaves.size() + " Spark Operators to RDDs");
+ }
+
+ for (SparkOperator leaf : leaves) {
+ new PhyPlanSetter(leaf.physicalPlan).visit();
+ Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
+ sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+ physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
+ jobConf);
}
}
@@ -509,21 +511,21 @@ public class SparkLauncher extends Launc
+ "sparkOperator:{}",
sparkOperator.physicalPlan.getLeaves().size(),
sparkOperator.name()));
- } else {
- PhysicalOperator leafPO = leafPOs.get(0);
- try {
- physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
- predecessorRDDs, convertMap);
- sparkOpRdds.put(sparkOperator.getOperatorKey(),
- physicalOpRdds.get(leafPO.getOperatorKey()));
- } catch(Exception e) {
- if( e instanceof SparkException) {
- LOG.info("throw SparkException, error founds when running " +
- "rdds in spark");
- }
- exception = e;
- isFail = true;
+ }
+
+ PhysicalOperator leafPO = leafPOs.get(0);
+ try {
+ physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+ predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+ physicalOpRdds.get(leafPO.getOperatorKey()));
+ } catch(Exception e) {
+ if( e instanceof SparkException) {
+ LOG.info("throw SparkException, error founds when running " +
+ "rdds in spark");
}
+ exception = e;
+ isFail = true;
}
List<POStore> poStores = PlanHelper.getPhysicalOperators(
@@ -541,10 +543,10 @@ public class SparkLauncher extends Launc
conf, exception);
}
} else {
- LOG.info(String
- .format(String.format("sparkOperator:{} does not have POStore or" +
- " sparkOperator has more than 1 POStore. {} is the size of POStore."),
- sparkOperator.name(), poStores.size()));
+ LOG.info(String
+ .format(String.format("sparkOperator:{} does not have POStore or" +
+ " sparkOperator has more than 1 POStore. {} is the size of POStore."),
+ sparkOperator.name(), poStores.size()));
}
}
@@ -557,9 +559,10 @@ public class SparkLauncher extends Launc
RDD<Tuple> nextRDD = null;
List<PhysicalOperator> predecessors = plan
.getPredecessors(physicalOperator);
- if (predecessors != null) {
+ if (predecessors != null && predecessors.size() > 1) {
Collections.sort(predecessors);
}
+
List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
if (predecessors != null) {
for (PhysicalOperator predecessor : predecessors) {
Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1680561&r1=1680560&r2=1680561&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Wed May 20 13:23:21 2015
@@ -100,14 +100,8 @@ public class TestForEachNestedPlanLocal
pig.registerQuery("D = foreach C {"
+ "crossed = cross user, session;"
+ "generate crossed;" + "}");
- Iterator<Tuple> expectedItr = expectedResults.iterator();
Iterator<Tuple> actualItr = pig.openIterator("D");
- while (expectedItr.hasNext() && actualItr.hasNext()) {
- Tuple expectedTuple = expectedItr.next();
- Tuple actualTuple = actualItr.next();
- assertEquals(expectedTuple, actualTuple);
- }
- assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
}
@Test
@@ -130,14 +124,8 @@ public class TestForEachNestedPlanLocal
+ "crossed = cross user, distinct_session;"
+ "filtered = filter crossed by user::region == distinct_session::region;"
+ "generate filtered;" + "}");
- Iterator<Tuple> expectedItr = expectedResults.iterator();
Iterator<Tuple> actualItr = pig.openIterator("D");
- while (expectedItr.hasNext() && actualItr.hasNext()) {
- Tuple expectedTuple = expectedItr.next();
- Tuple actualTuple = actualItr.next();
- assertEquals(expectedTuple, actualTuple);
- }
- assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
}
@Test
@@ -161,14 +149,8 @@ public class TestForEachNestedPlanLocal
pig.registerQuery("D = foreach C {"
+ "crossed = cross user, session, profile;"
+ "generate crossed;" + "}");
- Iterator<Tuple> expectedItr = expectedResults.iterator();
Iterator<Tuple> actualItr = pig.openIterator("D");
- while (expectedItr.hasNext() && actualItr.hasNext()) {
- Tuple expectedTuple = expectedItr.next();
- Tuple actualTuple = actualItr.next();
- assertEquals(expectedTuple, actualTuple);
- }
- assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
}
/*