You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/07 22:39:30 UTC
svn commit: r1678261 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/plan/
test/org/apache/pig/test/
Author: xuefu
Date: Thu May 7 20:39:29 2015
New Revision: 1678261
URL: http://svn.apache.org/r1678261
Log:
PIG-4421: implement visitSkewedJoin in SparkCompiler (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
pig/branches/spark/test/org/apache/pig/test/Util.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu May 7 20:39:29 2015
@@ -535,10 +535,13 @@ public class SparkLauncher extends Launc
List<RDD<Tuple>> rddsFromPredeSparkOper,
Map<Class<? extends PhysicalOperator>, POConverter> convertMap)
throws IOException {
- RDD<Tuple> nextRDD = null;
- List<PhysicalOperator> predecessors = plan
- .getPredecessors(physicalOperator);
- List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
+ RDD<Tuple> nextRDD = null;
+ List<PhysicalOperator> predecessors = plan
+ .getPredecessors(physicalOperator);
+ if (predecessors != null) {
+ Collections.sort(predecessors);
+ }
+ List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
if (predecessors != null) {
for (PhysicalOperator predecessor : predecessors) {
physicalToRDD(plan, predecessor, rdds, rddsFromPredeSparkOper,
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Thu May 7 20:39:29 2015
@@ -600,10 +600,26 @@ public class SparkCompiler extends PhyPl
}
}
- @Override
- public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
- // TODO
- }
+ /**
+ * currently use regular join to replace skewedJoin
+ * Skewed join currently works with two-table inner join.
+ * More info about pig SkewedJoin, See https://wiki.apache.org/pig/PigSkewedJoinSpec
+ *
+ * @param op
+ * @throws VisitorException
+ */
+ @Override
+ public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+ try {
+ addToPlan(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " +
+ op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
@Override
public void visitFRJoin(POFRJoin op) throws VisitorException {
Modified: pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java Thu May 7 20:39:29 2015
@@ -284,6 +284,13 @@ public class TestSkewedJoin {
@Test
public void testSkewedJoinKeyPartition() throws IOException {
+ // This test relies on how the keys are distributed in Skew Join implementation.
+ // Spark engine currently implements skew join as regular join, and hence does
+ // not control key distribution.
+ // TODO: Enable this test when Spark engine implements Skew Join algorithm.
+ if (Util.isSparkExecType(cluster.getExecType()))
+ return;
+
String outputDir = "testSkewedJoinKeyPartition";
try{
Util.deleteFile(cluster, outputDir);
Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Thu May 7 20:39:29 2015
@@ -1402,4 +1402,12 @@ public class Util {
});
return parts[0];
}
+
+ public static boolean isSparkExecType(ExecType execType) {
+ if (execType.name().toLowerCase().startsWith("spark")) {
+ return true;
+ }
+
+ return false;
+ }
}