You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/12 01:43:53 UTC
svn commit: r1791057 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/
test/org/apache/pig/spark/ test/org/apache/pig/test/data/GoldenFiles/spark/
Author: zly
Date: Wed Apr 12 01:43:53 2017
New Revision: 1791057
URL: http://svn.apache.org/viewvc?rev=1791057&view=rev
Log:
PIG-5202:Create a TestSparkCompiler test class for spark branch(Adam via Liyun)
Added:
pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java
pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/
pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1791057&r1=1791056&r2=1791057&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Apr 12 01:43:53 2017
@@ -508,7 +508,7 @@ public class SparkLauncher extends Launc
}
}
- private SparkOperPlan compile(PhysicalPlan physicalPlan,
+ public SparkOperPlan compile(PhysicalPlan physicalPlan,
PigContext pigContext) throws PlanException, IOException,
VisitorException {
SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
Added: pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java (added)
+++ pig/branches/spark/test/org/apache/pig/spark/TestSparkCompiler.java Wed Apr 12 01:43:53 2017
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Properties;
+import java.util.Random;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.TransformerException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLocalExecType;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.Util;
+import org.apache.pig.test.utils.TestHelper;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases to test the SparkCompiler. VERY IMPORTANT NOTE: The tests here
+ * compare results with a "golden" set of outputs. In each test case here, the
+ * operators generated have a random operator key which uses Java's Random
+ * class. So if there is a code change which changes the number of operators
+ * created in a plan, then the "golden" file for that test case
+ * need to be changed.
+ */
+
+public class TestSparkCompiler {
+ private static PigContext pc;
+ private static PigServer pigServer;
+ private static final int MAX_SIZE = 100000;
+
+ private enum PlanPrinter {
+ TEXT,
+ DOT,
+ XML;
+
+ public void doPrint(PrintStream ps, SparkOperPlan plan) throws VisitorException, ParserConfigurationException, TransformerException {
+ switch (this) {
+ case DOT:
+ (new DotSparkPrinter(plan, ps)).dump();
+ break;
+ case XML:
+ XMLSparkPrinter printer = new XMLSparkPrinter(ps, plan);
+ printer.visit();
+ printer.closePlan();
+ break;
+ case TEXT:
+ default:
+ (new SparkPrinter(ps, plan)).visit();
+ break;
+ }
+ }
+ }
+
+ // If for some reason, the golden files need to be regenerated, set this to
+ // true - THIS WILL OVERWRITE THE GOLDEN FILES - So use this with caution
+ // and only for the test cases you need and are sure of.
+ private boolean generate = false;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ resetFileLocalizer();
+ pc = new PigContext(new SparkLocalExecType(), new Properties());
+ FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ resetFileLocalizer();
+ }
+
+ @Before
+ public void setUp() throws ExecException {
+ resetScope();
+ pigServer = new PigServer(pc);
+ }
+
+ private void resetScope() {
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ }
+
+ private static void resetFileLocalizer() {
+ FileLocalizer.deleteTempFiles();
+ FileLocalizer.setInitialized(false);
+ // Set random seed to generate deterministic temporary paths
+ FileLocalizer.setR(new Random(1331L));
+ }
+
+ @Test
+ public void testStoreLoad() throws Exception {
+ String query =
+ "a = load 'file:///tmp/input' as (x:int, y:int);" +
+ "store a into 'file:///tmp/pigoutput';" +
+ "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" +
+ "store b into 'file:///tmp/pigoutput1';";
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld", PlanPrinter.TEXT);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld", PlanPrinter.XML);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld", PlanPrinter.DOT);
+ }
+
+ private void run(String query, String expectedFile, PlanPrinter planPrinter) throws Exception {
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ SparkLauncher launcher = new SparkLauncher();
+ pc.inExplain = true;
+ SparkOperPlan sparkOperPlan = launcher.compile(pp, pc);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ planPrinter.doPrint(ps, sparkOperPlan);
+ String compiledPlan = baos.toString();
+ System.out.println();
+ System.out.println("<<<" + compiledPlan + ">>>");
+
+ if (generate) {
+ FileOutputStream fos = new FileOutputStream(expectedFile);
+ fos.write(baos.toByteArray());
+ fos.close();
+ return;
+ }
+ FileInputStream fis = new FileInputStream(expectedFile);
+ byte[] b = new byte[MAX_SIZE];
+ int len = fis.read(b);
+ fis.close();
+ String goldenPlan = new String(b, 0, len);
+ if (goldenPlan.charAt(len-1) == '\n') {
+ goldenPlan = goldenPlan.substring(0, len-1);
+ }
+
+ System.out.println("-------------");
+ System.out.println("Golden");
+ System.out.println("<<<" + goldenPlan + ">>>");
+ System.out.println("-------------");
+
+ String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim();
+ String compiledPlanClean = Util.standardizeNewline(compiledPlan).trim();
+ assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
+ TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
+ }
+
+}
+
Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld Wed Apr 12 01:43:53 2017
@@ -0,0 +1,71 @@
+digraph plan {
+compound=true;
+node [shape=rect];
+s487399236_in [label="", style=invis, height=0, width=0];
+s487399236_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399236 {
+label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b;
+s0_in [label="", style=invis, height=0, width=0];
+s0_out [label="", style=invis, height=0, width=0];
+subgraph cluster_0 {
+label="spark"; style="filled"; fillcolor="white"labelloc=b;
+487399275 [label="a: Load(file:///tmp/input,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+s487399268_in [label="", style=invis, height=0, width=0];
+s487399268_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399268 {
+label="a: New For Each(false,false)[bag]"labelloc=b;
+487399274 [label="Project[bytearray][0]"];
+487399273 [label="Cast[int]"];
+487399274 -> 487399273
+s487399268_in -> 487399274 [style=invis];
+487399270 [label="Cast[int]"];
+487399271 [label="Project[bytearray][1]"];
+487399271 -> 487399270
+s487399268_in -> 487399271 [style=invis];
+};
+487399273 -> s487399268_out [style=invis];
+487399270 -> s487399268_out [style=invis];
+487399267 [label="a: Store(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+487399275 -> s487399268_in [lhead=cluster_487399268]
+s487399268_out -> 487399267
+s0_in -> 487399275 [style=invis];
+};
+487399267 -> s0_out [style=invis];
+s487399236_in -> s0_in [style=invis];
+};
+s0_out -> s487399236_out [style=invis];
+s487399235_in [label="", style=invis, height=0, width=0];
+s487399235_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399235 {
+label="Spark(-1,PigStorage)"; style="filled"; fillcolor="#EEEEEE"labelloc=b;
+s1_in [label="", style=invis, height=0, width=0];
+s1_out [label="", style=invis, height=0, width=0];
+subgraph cluster_1 {
+label="spark"; style="filled"; fillcolor="white"labelloc=b;
+s487399238_in [label="", style=invis, height=0, width=0];
+s487399238_out [label="", style=invis, height=0, width=0];
+subgraph cluster_487399238 {
+label="b: New For Each(false,false)[bag]"labelloc=b;
+487399244 [label="Project[bytearray][0]"];
+487399243 [label="Cast[int]"];
+487399244 -> 487399243
+s487399238_in -> 487399244 [style=invis];
+487399241 [label="Project[bytearray][1]"];
+487399240 [label="Cast[int]"];
+487399241 -> 487399240
+s487399238_in -> 487399241 [style=invis];
+};
+487399243 -> s487399238_out [style=invis];
+487399240 -> s487399238_out [style=invis];
+487399237 [label="b: Store(file:///tmp/pigoutput1,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+487399266 [label="b: Load(file:///tmp/pigoutput,\norg.apache.pig.builtin.PigStorage)", style="filled", fillcolor="gray"];
+s487399238_out -> 487399237
+487399266 -> s487399238_in [lhead=cluster_487399238]
+s1_in -> 487399266 [style=invis];
+};
+487399237 -> s1_out [style=invis];
+s487399235_in -> s1_in [style=invis];
+};
+s1_out -> s487399235_out [style=invis];
+s487399236_out -> s487399235_in [lhead=cluster_487399235]
+}
Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld Wed Apr 12 01:43:53 2017
@@ -0,0 +1,33 @@
+#--------------------------------------------------
+# Spark Plan
+#--------------------------------------------------
+
+Spark node scope-18
+a: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-8
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0--------
+
+Spark node scope-19
+b: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-17
+|
+|---b: New For Each(false,false)[bag] - scope-16
+ | |
+ | Cast[int] - scope-11
+ | |
+ | |---Project[bytearray][0] - scope-10
+ | |
+ | Cast[int] - scope-14
+ | |
+ | |---Project[bytearray][1] - scope-13
+ |
+ |---b: Load(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-9--------
Added: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld?rev=1791057&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld (added)
+++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld Wed Apr 12 01:43:53 2017
@@ -0,0 +1,49 @@
+<sparkPlan>
+ <sparkNode scope="38">
+ <POStore scope="28">
+ <alias>a</alias>
+ <storeFile>file:///tmp/pigoutput</storeFile>
+ <isTmpStore>false</isTmpStore>
+ <POForEach scope="27">
+ <alias>a</alias>
+ <POCast scope="22">
+ <alias>x</alias>
+ <POProject scope="21"/>
+ </POCast>
+ <POCast scope="25">
+ <alias>y</alias>
+ <POProject scope="24"/>
+ </POCast>
+ <POLoad scope="20">
+ <alias>a</alias>
+ <loadFile>file:///tmp/input</loadFile>
+ <isTmpLoad>false</isTmpLoad>
+ </POLoad>
+ </POForEach>
+ </POStore>
+ </sparkNode>
+ <sparkNode scope="39">
+ <POStore scope="37">
+ <alias>b</alias>
+ <storeFile>file:///tmp/pigoutput1</storeFile>
+ <isTmpStore>false</isTmpStore>
+ <POForEach scope="36">
+ <alias>b</alias>
+ <POCast scope="31">
+ <alias>x</alias>
+ <POProject scope="30"/>
+ </POCast>
+ <POCast scope="34">
+ <alias>y</alias>
+ <POProject scope="33"/>
+ </POCast>
+ <POLoad scope="29">
+ <alias>b</alias>
+ <loadFile>file:///tmp/pigoutput</loadFile>
+ <isTmpLoad>false</isTmpLoad>
+ </POLoad>
+ </POForEach>
+ </POStore>
+ </sparkNode>
+</sparkPlan>
+