You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/12 01:43:53 UTC

svn commit: r1791057 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/ test/org/apache/pig/spark/ test/org/apache/pig/test/data/GoldenFiles/spark/

Author: zly
Date: Wed Apr 12 01:43:53 2017
New Revision: 1791057

URL: http://svn.apache.org/viewvc?rev=1791057&view=rev
Log:
PIG-5202:Create a TestSparkCompiler test class for spark branch(Adam via Liyun)

Added:
    pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java
    pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/
    pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
    pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
    pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1791057&r1=1791056&r2=1791057&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Apr 12 01:43:53 2017
@@ -508,7 +508,7 @@ public class SparkLauncher extends Launc
         }
     }
 
-    private SparkOperPlan compile(PhysicalPlan physicalPlan,
+    public SparkOperPlan compile(PhysicalPlan physicalPlan,
                                   PigContext pigContext) throws PlanException, IOException,
             VisitorException {
         SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,

Added: pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java (added)
+++ pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java Wed Apr 12 01:43:53 2017
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Properties;
+import java.util.Random;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.TransformerException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLocalExecType;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.Util;
+import org.apache.pig.test.utils.TestHelper;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases to test the SparkCompiler. VERY IMPORTANT NOTE: The tests here
+ * compare results with a "golden" set of outputs. In each test case here, the
+ * operators generated have a random operator key which uses Java's Random
+ * class. So if there is a code change which changes the number of operators
+ * created in a plan, then  the "golden" file for that test case
+ * need to be changed.
+ */
+
+public class TestSparkCompiler {
+    private static PigContext pc;
+    private static PigServer pigServer;
+    private static final int MAX_SIZE = 100000;
+
+    private enum PlanPrinter {
+        TEXT,
+        DOT,
+        XML;
+
+        public void doPrint(PrintStream ps, SparkOperPlan plan) throws VisitorException, ParserConfigurationException, TransformerException {
+            switch (this) {
+                case DOT:
+                    (new DotSparkPrinter(plan, ps)).dump();
+                    break;
+                case XML:
+                    XMLSparkPrinter printer = new XMLSparkPrinter(ps, plan);
+                    printer.visit();
+                    printer.closePlan();
+                    break;
+                case TEXT:
+                default:
+                    (new SparkPrinter(ps, plan)).visit();
+                    break;
+            }
+        }
+    }
+
+    // If for some reason, the golden files need to be regenerated, set this to
+    // true - THIS WILL OVERWRITE THE GOLDEN FILES - So use this with caution
+    // and only for the test cases you need and are sure of.
+    private boolean generate = false;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        resetFileLocalizer();
+        pc = new PigContext(new SparkLocalExecType(), new Properties());
+        FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        resetFileLocalizer();
+    }
+
+    @Before
+    public void setUp() throws ExecException {
+        resetScope();
+        pigServer = new PigServer(pc);
+    }
+
+    private void resetScope() {
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+    }
+
+    private static void resetFileLocalizer() {
+        FileLocalizer.deleteTempFiles();
+        FileLocalizer.setInitialized(false);
+        // Set random seed to generate deterministic temporary paths
+        FileLocalizer.setR(new Random(1331L));
+    }
+
+    @Test
+    public void testStoreLoad() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "store a into 'file:///tmp/pigoutput';" +
+                "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" +
+                "store b into 'file:///tmp/pigoutput1';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld", PlanPrinter.TEXT);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld", PlanPrinter.XML);
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld", PlanPrinter.DOT);
+    }
+
+    private void run(String query, String expectedFile, PlanPrinter planPrinter) throws Exception {
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        SparkLauncher launcher = new SparkLauncher();
+        pc.inExplain = true;
+        SparkOperPlan sparkOperPlan = launcher.compile(pp, pc);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        planPrinter.doPrint(ps, sparkOperPlan);
+        String compiledPlan = baos.toString();
+        System.out.println();
+        System.out.println("<<<" + compiledPlan + ">>>");
+
+        if (generate) {
+            FileOutputStream fos = new FileOutputStream(expectedFile);
+            fos.write(baos.toByteArray());
+            fos.close();
+            return;
+        }
+        FileInputStream fis = new FileInputStream(expectedFile);
+        byte[] b = new byte[MAX_SIZE];
+        int len = fis.read(b);
+        fis.close();
+        String goldenPlan = new String(b, 0, len);
+        if (goldenPlan.charAt(len-1) == '\n') {
+            goldenPlan = goldenPlan.substring(0, len-1);
+        }
+
+        System.out.println("-------------");
+        System.out.println("Golden");
+        System.out.println("<<<" + goldenPlan + ">>>");
+        System.out.println("-------------");
+
+        String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim();
+        String compiledPlanClean = Util.standardizeNewline(compiledPlan).trim();
+        assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
+                TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
+    }
+
+}
+

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld Wed Apr 12 01:43:53 2017
@@ -0,0 +1,71 @@
+digraph plan {
+compound=true;
+node [shape=rect];
+s487399236_in [label="", style=invis, height=0, width=0];
+s487399236_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399236 {
+label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b;
+s0_in [label="", style=invis, height=0, width=0];
+s0_out [label="", style=invis, height=0, width=0];
+subgraph cluster_0 {
+label="spark"; style="filled"; fillcolor="white"labelloc=b;
+487399275 [label="a: Load(file:///tmp/input,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+s487399268_in [label="", style=invis, height=0, width=0];
+s487399268_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399268 {
+label="a: New For Each(false,false)[bag]"labelloc=b;
+487399274 [label="Project[bytearray][0]"];
+487399273 [label="Cast[int]"];
+487399274 -> 487399273
+s487399268_in -> 487399274 [style=invis];
+487399270 [label="Cast[int]"];
+487399271 [label="Project[bytearray][1]"];
+487399271 -> 487399270
+s487399268_in -> 487399271 [style=invis];
+};
+487399273 -> s487399268_out [style=invis];
+487399270 -> s487399268_out [style=invis];
+487399267 [label="a: Store(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+487399275 -> s487399268_in [lhead=cluster_487399268]
+s487399268_out -> 487399267
+s0_in -> 487399275 [style=invis];
+};
+487399267 -> s0_out [style=invis];
+s487399236_in -> s0_in [style=invis];
+};
+s0_out -> s487399236_out [style=invis];
+s487399235_in [label="", style=invis, height=0, width=0];
+s487399235_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399235 {
+label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b;
+s1_in [label="", style=invis, height=0, width=0];
+s1_out [label="", style=invis, height=0, width=0];
+subgraph cluster_1 {
+label="spark"; style="filled"; fillcolor="white"labelloc=b;
+s487399238_in [label="", style=invis, height=0, width=0];
+s487399238_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399238 {
+label="b: New For Each(false,false)[bag]"labelloc=b;
+487399244 [label="Project[bytearray][0]"];
+487399243 [label="Cast[int]"];
+487399244 -> 487399243
+s487399238_in -> 487399244 [style=invis];
+487399241 [label="Project[bytearray][1]"];
+487399240 [label="Cast[int]"];
+487399241 -> 487399240
+s487399238_in -> 487399241 [style=invis];
+};
+487399243 -> s487399238_out [style=invis];
+487399240 -> s487399238_out [style=invis];
+487399237 [label="b: Store(file:///tmp/pigoutput1,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+487399266 [label="b: Load(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+s487399238_out -> 487399237
+487399266 -> s487399238_in [lhead=cluster_487399238]
+s1_in -> 487399266 [style=invis];
+};
+487399237 -> s1_out [style=invis];
+s487399235_in -> s1_in [style=invis];
+};
+s1_out -> s487399235_out [style=invis];
+s487399236_out -> s487399235_in [lhead=cluster_487399235]
+}

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld Wed Apr 12 01:43:53 2017
@@ -0,0 +1,33 @@
+#--------------------------------------------------
+# Spark Plan                                  
+#--------------------------------------------------
+
+Spark node scope-18
+a: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-8
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[int] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0--------
+
+Spark node scope-19
+b: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---b: New For Each(false,false)[bag] - scope-16
+    |   |
+    |   Cast[int] - scope-11
+    |   |
+    |   |---Project[bytearray][0] - scope-10
+    |   |
+    |   Cast[int] - scope-14
+    |   |
+    |   |---Project[bytearray][1] - scope-13
+    |
+    |---b: Load(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-9--------

Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld Wed Apr 12 01:43:53 2017
@@ -0,0 +1,49 @@
+<sparkPlan>
+  <sparkNode scope="38">
+    <POStore scope="28">
+      <alias>a</alias>
+      <storeFile>file:///tmp/pigoutput</storeFile>
+      <isTmpStore>false</isTmpStore>
+      <POForEach scope="27">
+        <alias>a</alias>
+        <POCast scope="22">
+          <alias>x</alias>
+          <POProject scope="21"/>
+        </POCast>
+        <POCast scope="25">
+          <alias>y</alias>
+          <POProject scope="24"/>
+        </POCast>
+        <POLoad scope="20">
+          <alias>a</alias>
+          <loadFile>file:///tmp/input</loadFile>
+          <isTmpLoad>false</isTmpLoad>
+        </POLoad>
+      </POForEach>
+    </POStore>
+  </sparkNode>
+  <sparkNode scope="39">
+    <POStore scope="37">
+      <alias>b</alias>
+      <storeFile>file:///tmp/pigoutput1</storeFile>
+      <isTmpStore>false</isTmpStore>
+      <POForEach scope="36">
+        <alias>b</alias>
+        <POCast scope="31">
+          <alias>x</alias>
+          <POProject scope="30"/>
+        </POCast>
+        <POCast scope="34">
+          <alias>y</alias>
+          <POProject scope="33"/>
+        </POCast>
+        <POLoad scope="29">
+          <alias>b</alias>
+          <loadFile>file:///tmp/pigoutput</loadFile>
+          <isTmpLoad>false</isTmpLoad>
+        </POLoad>
+      </POForEach>
+    </POStore>
+  </sparkNode>
+</sparkPlan>
+