You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/04 20:22:33 UTC

svn commit: r1706711 - in /pig/trunk: CHANGES.txt src/org/apache/pig/scripting/BoundScript.java test/org/apache/pig/test/TestPigServerLocal.java

Author: rohini
Date: Sun Oct  4 18:22:32 2015
New Revision: 1706711

URL: http://svn.apache.org/viewvc?rev=1706711&view=rev
Log:
PIG-4670: Embedded Python scripts still parse line by line (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/scripting/BoundScript.java
    pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1706711&r1=1706710&r2=1706711&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Oct  4 18:22:32 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4670: Embedded Python scripts still parse line by line (rohini)
+
 PIG-4663: HBaseStorage should allow the MaxResultsPerColumnFamily limit to avoid memory or scan timeout issues (pmazak via rohini)
 
 PIG-4673: Built In UDF - REPLACE_MULTI : For a given string, search and replace all occurrences

Modified: pig/trunk/src/org/apache/pig/scripting/BoundScript.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/BoundScript.java?rev=1706711&r1=1706710&r2=1706711&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/BoundScript.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/BoundScript.java Sun Oct  4 18:22:32 2015
@@ -31,8 +31,8 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigServer;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.PigServer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.pigscript.parser.ParseException;
@@ -45,41 +45,41 @@ import org.apache.pig.tools.pigstats.Scr
  * This represents an instance of a bound pipeline.
  */
 public class BoundScript {
-    
+
     private static final Log LOG = LogFactory.getLog(BoundScript.class);
-    
+
     private List<String> queries = new ArrayList<String>();
 
     private String name = null;
-   
+
     private ScriptPigContext scriptContext = null;
-    
+
     BoundScript(String query, ScriptPigContext scriptContext, String name) {
         this.queries.add(query);
         this.scriptContext = scriptContext;
-        this.name = name;               
+        this.name = name;
     }
-    
+
     BoundScript(List<String> queries, ScriptPigContext scriptContext,
             String name) {
         this.queries.addAll(queries);
         this.scriptContext = ScriptPigContext.get();
-        this.name = name;        
+        this.name = name;
     }
-    
+
     /**
-     * Run a pipeline on Hadoop.  
-     * If there are no stores in this pipeline then nothing will be run. 
+     * Run a pipeline on Hadoop.
+     * If there are no stores in this pipeline then nothing will be run.
      * @return {@link PigStats}, null if there is no bound query to run.
      * @throws IOException
      */
     public PigStats runSingle() throws IOException {
         return runSingle((Properties)null);
     }
-     
+
     /**
-     * Run a pipeline on Hadoop.  
-     * If there are no stores in this pipeline then nothing will be run.  
+     * Run a pipeline on Hadoop.
+     * If there are no stores in this pipeline then nothing will be run.
      * @param prop Map of properties that Pig should set when running the script.
      * This is intended for use with scripting languages that do not support
      * the Properties object.
@@ -98,14 +98,14 @@ public class BoundScript {
         if (prop != null) {
             scriptContext.getPigContext().getProperties().putAll(prop);
         }
-        PigStats ret = exec(queries.get(0)); 
+        PigStats ret = exec(queries.get(0));
         setPigStats(ret);
         return ret;
     }
-    
+
     /**
-     * Run a pipeline on Hadoop.  
-     * If there are no stores in this pipeline then nothing will be run.  
+     * Run a pipeline on Hadoop.
+     * If there are no stores in this pipeline then nothing will be run.
      * @param propfile File with properties that Pig should set when running the script.
      * @return {@link PigStats}, null if there is no bound query to run.
      * @throws IOException
@@ -123,17 +123,17 @@ public class BoundScript {
     }
 
     /**
-     * Run multiple instances of bound pipeline on Hadoop in parallel.  
-     * If there are no stores in this pipeline then nothing will be run.  
-     * Bind is called first with the list of maps of variables to bind. 
+     * Run multiple instances of bound pipeline on Hadoop in parallel.
+     * If there are no stores in this pipeline then nothing will be run.
+     * Bind is called first with the list of maps of variables to bind.
      * @return a list of {@link PigStats}, one for each map of variables passed
      * to bind.
      * @throws IOException
-     */    
-    public List<PigStats> run() throws IOException {    
+     */
+    public List<PigStats> run() throws IOException {
         return run((Properties)null);
     }
-    
+
     /**
      * Run multiple instances of bound pipeline on Hadoop in parallel.
      * @param prop Map of properties that Pig should set when running the script.
@@ -148,7 +148,7 @@ public class BoundScript {
         if (queries.isEmpty()) {
             LOG.info("No bound query to run.");
             return stats;
-        } 
+        }
         if (queries.size() == 1) {
             PigStats ps = runSingle();
             stats.add(ps);
@@ -157,20 +157,20 @@ public class BoundScript {
         if (prop != null) {
             scriptContext.getPigContext().getProperties().putAll(prop);
         }
-        List<PigProgressNotificationListener> listeners 
+        List<PigProgressNotificationListener> listeners
             = ScriptState.get().getAllListeners();
-        SyncProgressNotificationAdaptor adaptor 
+        SyncProgressNotificationAdaptor adaptor
             = new SyncProgressNotificationAdaptor(listeners);
         List<Future<PigStats>> futures = new ArrayList<Future<PigStats>>();
         ExecutorService executor = Executors.newFixedThreadPool(queries.size());
-        for (int i=0; i<queries.size(); i++) {          
+        for (int i=0; i<queries.size(); i++) {
             Properties props = new Properties();
             props.putAll(scriptContext.getPigContext().getProperties());
             PigContext ctx = new PigContext(scriptContext.getPigContext().getExecType(), props);
             MyCallable worker = new MyCallable(queries.get(i), ctx, adaptor);
             Future<PigStats> submit = executor.submit(worker);
             futures.add(submit);
-        }           
+        }
         for (Future<PigStats> future : futures) {
             try {
                 stats.add(future.get());
@@ -178,23 +178,23 @@ public class BoundScript {
                 LOG.error("Pig pipeline failed to complete", e);
                 PigStatsUtil.getEmptyPigStats();
                 PigStatsUtil.setErrorMessage(e.getMessage());
-                PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);                    
+                PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);
                 stats.add(failed);
             } catch (ExecutionException e) {
                 LOG.error("Pig pipeline failed to complete", e);
                 PigStatsUtil.getEmptyPigStats();
-                PigStatsUtil.setErrorMessage(e.getMessage());                  
-                PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);                    
+                PigStatsUtil.setErrorMessage(e.getMessage());
+                PigStats failed = PigStatsUtil.getPigStats(ReturnCode.FAILURE);
                 stats.add(failed);
             }
         }
-    
+
         if (!stats.isEmpty()) {
             setPigStats(stats);;
         }
         return stats;
     }
-    
+
     /**
      * Run multiple instances of bound pipeline on Hadoop in parallel.
      * @param propfile File with properties that Pig should set when running the script.
@@ -210,12 +210,12 @@ public class BoundScript {
             prop.load(fin);
         } finally {
             if (fin != null) fin.close();
-        }        
+        }
         return run(prop);
     }
 
     /**
-     * Run illustrate for this pipeline.  Results will be printed to stdout.  
+     * Run illustrate for this pipeline.  Results will be printed to stdout.
      * @throws IOException if illustrate fails.
      */
     public void illustrate() throws IOException {
@@ -255,10 +255,10 @@ public class BoundScript {
         }
         PigServer pigServer = new PigServer(scriptContext.getPigContext(), false);
         registerQuery(pigServer, queries.get(0));
-        pigServer.dumpSchema(alias);        
+        pigServer.dumpSchema(alias);
     }
 
-    //-------------------------------------------------------------------------      
+    //-------------------------------------------------------------------------
 
     private PigStats exec(String query) throws IOException {
         LOG.info("Query to run:\n" + query);
@@ -271,30 +271,27 @@ public class BoundScript {
             ScriptState.get().registerListener(listener);
         }
         PigServer pigServer = new PigServer(scriptContext.getPigContext(), false);
-        pigServer.setBatchOn();
         GruntParser grunt = new GruntParser(new StringReader(query), pigServer);
         grunt.setInteractive(false);
         try {
-            grunt.parseStopOnError(true);
+            grunt.parseStopOnError(false);
         } catch (ParseException e) {
             throw new IOException("Failed to parse script " + e.getMessage(), e);
         }
-        pigServer.executeBatch();
         return PigStats.get();
     }
 
     private void registerQuery(PigServer pigServer, String pl) throws IOException {
         GruntParser grunt = new GruntParser(new StringReader(pl), pigServer);
         grunt.setInteractive(false);
-        pigServer.setBatchOn();
       try {
-            grunt.parseStopOnError(true);
+            grunt.parseStopOnError(false);
         } catch (ParseException e) {
             throw new IOException("Failed to parse query: " + pl, e);
         }
     }
-    
-    private void setPigStats(PigStats stats) {        
+
+    private void setPigStats(PigStats stats) {
         ScriptEngine engine = scriptContext.getScriptEngine();
         if (name != null) {
             engine.setPigStats(name, stats);
@@ -304,28 +301,28 @@ public class BoundScript {
     }
 
     private void setPigStats(List<PigStats> lst) {
-        if (lst == null || lst.isEmpty()) return;        
+        if (lst == null || lst.isEmpty()) return;
         String key = (name != null) ? name : this.toString();
         ScriptEngine engine = scriptContext.getScriptEngine();
         for (PigStats stats : lst) {
             engine.setPigStats(key, stats);
-        } 
+        }
     }
-        
+
     //-------------------------------------------------------------------------
-    
+
     private class MyCallable implements Callable<PigStats> {
-        
+
         private String query = null;
         private PigContext ctx = null;
         private PigProgressNotificationListener adaptor;
-        
+
         public MyCallable(String pl, PigContext ctx, PigProgressNotificationListener adaptor) {
             query = pl;
             this.ctx = ctx;
             this.adaptor = adaptor;
         }
-        
+
         @Override
         public PigStats call() throws Exception {
             LOG.info("Query to run:\n" + query);
@@ -335,15 +332,13 @@ public class BoundScript {
             ScriptState.get().setScript(query);
             ScriptState.get().registerListener(adaptor);
             PigServer pigServer = new PigServer(ctx, true);
-            pigServer.setBatchOn();
             GruntParser grunt = new GruntParser(new StringReader(query), pigServer);
             grunt.setInteractive(false);
             try {
-                grunt.parseStopOnError(true);
+                grunt.parseStopOnError(false);
             } catch (ParseException e) {
                 throw new IOException("Failed to parse script", e);
             }
-            pigServer.executeBatch();
             return PigStats.get();
         }
     }

Modified: pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java?rev=1706711&r1=1706710&r2=1706711&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java Sun Oct  4 18:22:32 2015
@@ -50,8 +50,10 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -249,21 +251,23 @@ public class TestPigServerLocal {
 
     @Test
     public void testSkipParseInRegisterForBatch() throws Throwable {
-        // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
-        // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
-        // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
-        // numTimesSchemaCalled = 4 (once per registerQuery)
         if (Util.getLocalTestMode().toString().startsWith("TEZ")) {
             _testSkipParseInRegisterForBatch(false, 8, 4);
             _testSkipParseInRegisterForBatch(true, 5, 1);
+            _testParseBatchWithScripting(5, 1);
         } else {
+            // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
+            // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
+            // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
+            // numTimesSchemaCalled = 4 (once per registerQuery)
             _testSkipParseInRegisterForBatch(false, 10, 4);
+            // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
+            // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
+            // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
+            // numTimesSchemaCalled = 1 (parseAndBuild)
             _testSkipParseInRegisterForBatch(true, 7, 1);
+            _testParseBatchWithScripting(7, 1);
         }
-        // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
-        // InputSizeReducerEstimator, getSplits->RandomSampleLoader,
-        // createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
-        // numTimesSchemaCalled = 1 (parseAndBuild)
     }
 
     @Test
@@ -322,6 +326,53 @@ public class TestPigServerLocal {
         }
 
         assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
+        assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
+        List<Tuple> out = data.get("bar");
+        assertEquals(2, out.size());
+        assertEquals(tuple("a", 1, "b"), out.get(0));
+        assertEquals(tuple("b", 2, "c"), out.get(1));
+    }
+
+    private void _testParseBatchWithScripting(int numTimesInitiated, int numTimesSchemaCalled) throws Throwable {
+        MockTrackingStorage.numTimesInitiated = 0;
+        MockTrackingStorage.numTimesSchemaCalled = 0;
+
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "P = Pig.compile(\"\"\"" +
+                        "A = load 'foo' USING org.apache.pig.test.TestPigServerLocal\\$MockTrackingStorage();" +
+                        "B = order A by $0,$1,$2;" +
+                        "C = LIMIT B 2;" +
+                        "store C into 'bar' USING mock.Storage();" +
+                        "\"\"\")",
+                "Q = P.bind()",
+                "stats = Q.runSingle()",
+                "if stats.isSuccessful():",
+                "\tprint 'success!'",
+                "else:",
+                "\traise 'failed'"
+        };
+
+        Properties properties = new Properties();
+        properties.setProperty("io.sort.mb", "2");
+        PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties);
+        PigServer pigServer = new PigServer(pigContext);
+        Data data = resetData(pigContext);
+        data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d"));
+
+        String scriptFile = tempDir + File.separator + "_testParseBatchWithScripting.py";
+        Util.createLocalInputFile(scriptFile , script);
+        ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptFile);
+
+        for (List<PigStats> stats : statsMap.values()) {
+            for (PigStats s : stats) {
+                assertTrue(s.isSuccessful());
+            }
+        }
+
+        assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
         assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
         List<Tuple> out = data.get("bar");
         assertEquals(2, out.size());