You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/04/13 02:10:08 UTC

svn commit: r1091615 - in /pig/trunk: CHANGES.txt src/org/apache/pig/PigServer.java src/org/apache/pig/tools/ToolsPigServer.java src/org/apache/pig/tools/grunt/GruntParser.java test/org/apache/pig/test/TestToolsPigServer.java

Author: gates
Date: Wed Apr 13 00:10:08 2011
New Revision: 1091615

URL: http://svn.apache.org/viewvc?rev=1091615&view=rev
Log:
PIG-1881 Need a special interface for Penny (Inspector Gadget)

Added:
    pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
    pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1091615&r1=1091614&r2=1091615&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 13 00:10:08 2011
@@ -32,6 +32,9 @@ PIG-1876: Typed map for Pig (daijy)
 
 IMPROVEMENTS
 
+PIG-1881: Need a special interface for Penny (Inspector Gadget) (laukik via
+gates)
+
 PIG-1947: Incorrect line number is reported during parsing(xuefu)
 
 PIG1918: Line number should be give for logical plan failures (xuefu)

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1091615&r1=1091614&r2=1091615&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 13 00:10:08 2011
@@ -44,6 +44,8 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Job;
@@ -122,7 +124,7 @@ import org.apache.pig.tools.pigstats.Pig
 @InterfaceStability.Stable
 public class PigServer {
 
-    private final Log log = LogFactory.getLog(getClass());
+    protected final Log log = LogFactory.getLog(getClass());
 
     /**
      * Given a string, determine the exec type.
@@ -161,14 +163,14 @@ public class PigServer {
      * on a new graph. After the nested script is done, the grunt
      * shell pops up the saved graph and continues working on it.
      */
-    private final Stack<Graph> graphs = new Stack<Graph>();
+    protected final Stack<Graph> graphs = new Stack<Graph>();
 
     /*
      * The current Graph the grunt shell is working on.
      */
     private Graph currDAG;
 
-    private final PigContext pigContext;
+    protected final PigContext pigContext;
     
     private String jobName;
 
@@ -176,7 +178,7 @@ public class PigServer {
 
     private final static AtomicInteger scopeCounter = new AtomicInteger(0);
     
-    private final String scope = constructScope();
+    protected final String scope = constructScope();
 
 
     private boolean isMultiQuery = true;
@@ -366,7 +368,15 @@ public class PigServer {
             stats = execute();
         }
 
-        
+        return getJobs(stats);
+    }
+    
+    /**
+     * Retrieves a list of Job objects from the PigStats object
+     * @param stats
+     * @return A list of ExecJob objects
+     */
+    protected List<ExecJob> getJobs(PigStats stats) {
         LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
         JobGraph jGraph = stats.getJobGraph();
         Iterator<JobStats> iter = jGraph.iterator();
@@ -649,15 +659,38 @@ public class PigServer {
      */
     public void registerScript(InputStream in, Map<String,String> params,List<String> paramsFiles) throws IOException {
         try {
-            // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
-            List<String> paramList = new ArrayList<String>();
-            if (params!=null){
-                for (Map.Entry<String, String> entry:params.entrySet()){
-                    paramList.add(entry.getKey()+"="+entry.getValue());
-                }
-            }
+            String substituted = doParamSubstitution(in, params, paramsFiles);
+            GruntParser grunt = new GruntParser(new StringReader(substituted));
+            grunt.setInteractive(false);
+            grunt.setParams(this);
+            grunt.parseStopOnError(true);
+        } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
+            log.error(e.getLocalizedMessage());
+            throw new IOException(e.getCause());
+        }
+    }
 
-            // do parameter substitution
+    /**
+     * Do parameter substitution.
+     * @param in The InputStream of file containing Pig Latin to do substitution on.
+     * @param params Parameters to use to substitute
+     * @param paramsFiles Files to use to do substitution.
+     * @return String containing Pig Latin with substitutions done
+     * @throws IOException
+     */
+    protected String doParamSubstitution(InputStream in,
+                                         Map<String,String> params,
+                                         List<String> paramsFiles) throws IOException {
+        // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
+        List<String> paramList = new ArrayList<String>();
+        if (params != null) {
+            for (Map.Entry<String, String> entry : params.entrySet()) {
+                paramList.add(entry.getKey() + "=" + entry.getValue());
+             }
+        }
+
+        // do parameter substitution
+        try {
             ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
             StringWriter writer = new StringWriter();
             psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(in)),
@@ -665,13 +698,7 @@ public class PigServer {
                                    paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
                                    paramsFiles!=null ? paramsFiles.toArray(new String[0]) : null);
 
-            GruntParser grunt = new GruntParser(new StringReader(writer.toString()));
-            grunt.setInteractive(false);
-            grunt.setParams(this);
-            grunt.parseStopOnError(true);
-        } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
-            log.error(e.getLocalizedMessage());
-            throw new IOException(e.getCause());
+            return writer.toString();
         } catch (org.apache.pig.tools.parameters.ParseException e) {
             log.error(e.getLocalizedMessage());
             throw new IOException(e.getCause());
@@ -679,6 +706,23 @@ public class PigServer {
     }
 
     /**
+     * Creates a clone of the current DAG
+     * @return A Graph object which is a clone of the current DAG
+     * @throws IOException
+     */
+    protected Graph getClonedGraph() throws IOException {
+        Graph graph = currDAG.duplicate();
+
+        if (graph == null) {
+            int errCode = 2127;
+            String msg = "Cloning of plan failed.";
+            throw new FrontendException(msg, errCode, PigException.BUG);
+        }
+        return graph;
+    }
+
+
+    /**
      * Register a query with the Pig runtime.  The query will be read from the indicated file.
      * @param fileName file to read query from.
      * @throws IOException
@@ -1275,10 +1319,22 @@ public class PigServer {
         ScriptState.get().setScriptFeatures( currDAG.lp );
         PhysicalPlan pp = compilePp();
        
+        return launchPlan(pp, "job_pigexec_");
+    }
+
+    /**
+     * A common method for launching the jobs according to the physical plan
+     * @param pp The physical plan
+     * @param jobName A String containing the job name to be used
+     * @return The PigStats object
+     * @throws ExecException
+     * @throws FrontendException
+     */
+    protected PigStats launchPlan(PhysicalPlan pp, String jobName) throws ExecException, FrontendException {
         MapReduceLauncher launcher = new MapReduceLauncher();
         PigStats stats = null;
         try {
-            stats = launcher.launchPig(pp, "job_pigexec_", pigContext);
+            stats = launcher.launchPig(pp, jobName, pigContext);
         } catch (Exception e) {
             // There are a lot of exceptions thrown by the launcher.  If this
             // is an ExecException, just let it through.  Else wrap it.
@@ -1342,7 +1398,7 @@ public class PigServer {
     /*
      * This class holds the internal states of a grunt shell session.
      */
-    private class Graph {
+    protected class Graph {
 
         private final Map<LogicalRelationalOperator, LogicalPlan> aliases = new HashMap<LogicalRelationalOperator, LogicalPlan>();
 
@@ -1364,7 +1420,7 @@ public class PigServer {
 
         private int currentLineNum = 0;
         
-        Graph(boolean batchMode) {
+        public Graph(boolean batchMode) {
             this.batchMode = batchMode;
             this.lp = new LogicalPlan();
         };
@@ -1411,6 +1467,22 @@ public class PigServer {
             return operators.get( alias );
         }
 
+        public LogicalPlan getPlan(String alias) throws IOException {
+            LogicalPlan plan = lp;
+
+            if (alias != null) {
+                LogicalRelationalOperator op = (LogicalRelationalOperator) operators.get(alias);
+                if(op == null) {
+                    int errCode = 1003;
+                    String msg = "Unable to find an operator for alias " + alias;
+                    throw new FrontendException(msg, errCode, PigException.INPUT);
+                }
+                plan = aliases.get(op);
+            }
+            return plan;
+        }
+
+
         /**
          * Build a plan for the given alias. Extra branches and child branch under alias
          * will be ignored. Dependent branch (i.e. scalar) will be kept.
@@ -1669,5 +1741,48 @@ public class PigServer {
                 }
             }
         }
+        
+
+        protected Graph duplicate() {
+            // There are two choices on how we duplicate the logical plan
+            // 1 - we really clone each operator and connect up the cloned operators
+            // 2 - we cache away the script till the point we need to clone
+            // and then simply re-parse the script. 
+            // The latter approach is used here
+            // FIXME: There is one open issue with this now:
+            // Consider the following script:
+            // A = load 'file:/somefile';
+            // B = filter A by $0 > 10;
+            // store B into 'bla';
+            // rm 'file:/somefile';
+            // A = load 'file:/someotherfile'
+            // when we try to clone - we try to reparse
+            // from the beginning and currently the parser
+            // checks for file existence of files in the load
+            // in the case where the file is a local one -i.e. with file: prefix
+            // This will be a known issue now and we will need to revisit later
+
+            // parse each line of the cached script
+            int lineNumber = 1;
+
+            // create data structures needed for parsing        
+            Graph graph = new Graph(isBatchOn());
+            graph.processedStores = processedStores;
+            graph.fileNameMap = new HashMap<String, String>(fileNameMap);
+
+            try {
+                for (Iterator<String> it = scriptCache.iterator(); it.hasNext(); lineNumber++) {
+                	// always doing registerQuery irrespective of the batch mode
+                	// TODO: Need to figure out if anything different needs to happen if batch 
+                	// mode is not on
+                    graph.registerQuery(it.next(), lineNumber);
+                }
+                graph.postProcess();
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+                graph = null;
+            }
+            return graph;
+        }
     }
 }

Added: pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java?rev=1091615&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java (added)
+++ pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java Wed Apr 13 00:10:08 2011
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
+
+/**
+ * ToolsPigServer is a subclass of PigServer intended only for Pig tools.  Users
+ * should not use this interface, as we make no promises about its stability or
+ * continued existence.
+ */
+@InterfaceAudience.LimitedPrivate({"Penny"})
+@InterfaceStability.Unstable
+public class ToolsPigServer extends PigServer {
+
+    private PigPlans plans = null;
+
+    /**
+     * @param execTypeString can be 'mapreduce' or 'local'.  Local mode will 
+     * use Hadoop's local job runner to execute the job on the local machine.
+     * Mapreduce mode will connect to a cluster to execute the job.
+     * @throws ExecException if throws by PigServer
+     * @throws IOException if throws by PigServer
+     */
+    public ToolsPigServer(String execTypeString) throws ExecException, IOException {
+        super(execTypeString);
+    }
+
+    /**
+     * @param execType execution type to start the engine in.
+     * @param properties to use for this run
+     * @throws ExecException if throws by PigServer
+     */
+    public ToolsPigServer(ExecType execType, Properties properties) throws ExecException {
+        super(execType, properties);
+    }
+
+    /**
+     * Register a script without running it.  This method is not compatible with
+     * {@link #registerQuery(String)}, {@link #registerScript(String)}, 
+     * {@link #store(String, String)} or {@link #openIterator(String)}.  It can be
+     * used with {@link #getPlans()} and {@link #runPlan(LogicalPlan, String)} in this class 
+     * only. The proper control flow is for the caller to call registerNoRun() and then
+     * getPlans() to get a copy of the plans.  The user can then modify the
+     * logical plan.  It can then be returned via runPlan(), which will execute
+     * the plan.
+     * @param fileName File containing Pig Latin script to register.
+     * @param params  the key is the parameter name, and the value is the parameter value
+     * @param paramFiles   files which have the parameter setting
+     * @throws IOException if it encounters problems reading the script
+     * @throws FrontendException if it encounters problems parsing the script
+     */
+    public void registerNoRun(String fileName,
+                              Map<String, String> params,
+                              List<String> paramFiles) throws IOException, FrontendException {
+
+        // Do parameter substitution
+        String substituted = null;
+        FileInputStream fis = null;
+        try{
+            fis = new FileInputStream(fileName);
+            substituted = doParamSubstitution(fis, params, paramFiles);
+        }catch (FileNotFoundException e){
+            log.error(e.getLocalizedMessage());
+            throw new IOException(e.getCause());
+        } finally {
+            if (fis != null) {
+                fis.close();
+            }
+        }
+
+        // Parse in grunt so that register commands are recognized
+        try {
+            GruntParser grunt = new GruntParser(new StringReader(substituted));
+            grunt.setInteractive(false);
+            grunt.setParams(this);
+            setBatchOn();
+            //grunt.setLoadOnly(true);
+            grunt.parseOnly();
+        } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
+            log.error(e.getLocalizedMessage());
+            throw new IOException(e.getCause());
+        }
+
+        Graph g = getClonedGraph();
+        LogicalPlan lp = g.getPlan(null);
+        plans = new PigPlans(lp);
+    }
+
+    /**
+     * Get a class containing the Pig plans.  For now it just contains
+     * the new logical plan.  At some point in the future it should contain
+     * the MR plan as well.
+     * @return the pig plans.
+     */
+    public PigPlans getPlans() {
+        return plans;
+    }
+
+    /**
+     * Given a (modified) new logical plan, run the script.
+     * @param newPlan plan to run
+     * @param jobName name to give the MR jobs associated with this run
+     * @return list of exec jobs describing the jobs that were run.
+     * @throws FrontendException if plan translation fails.
+     * @throws ExecException if running the job fails.
+     */
+    public List<ExecJob> runPlan(LogicalPlan newPlan,
+                                 String jobName) throws FrontendException, ExecException {
+    	
+        HExecutionEngine engine = new HExecutionEngine(pigContext);
+        PhysicalPlan pp = engine.compile(newPlan, null);
+        PigStats stats = launchPlan(pp, jobName);
+        return getJobs(stats);                        
+    }
+            
+    public static class PigPlans {
+
+        public LogicalPlan lp;
+
+        public PigPlans(LogicalPlan lp) {
+            this.lp = lp;
+        }
+    };
+}
+
+
+
+

Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1091615&r1=1091614&r2=1091615&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Wed Apr 13 00:10:08 2011
@@ -176,14 +176,13 @@ public class GruntParser extends PigScri
                 parse();
             }
             
-	    if (!sameBatch) {
-		executeBatch();
-	    }
-        } 
-        finally {
-	    if (!sameBatch) {
-		discardBatch();
-	    }
+            if (!sameBatch) {
+                executeBatch();
+            }
+        } finally {
+            if (!sameBatch) {
+                discardBatch();
+            }
         }
         int [] res = { mNumSucceededJobs, mNumFailedJobs };
         return res;
@@ -234,6 +233,20 @@ public class GruntParser extends PigScri
     public boolean isDone() {
         return mDone;
     }
+
+    /*
+     * parseOnly method added for supporting penny
+     */
+    public void parseOnly() throws IOException, ParseException {
+        if (mPigServer == null) {
+            throw new IllegalStateException();
+        }
+
+        mDone = false;
+        while(!mDone) {
+            parse();
+        }
+    }    
     
     @Override
     protected void processDescribe(String alias) throws IOException {

Added: pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java?rev=1091615&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java Wed Apr 13 00:10:08 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.ToolsPigServer;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestToolsPigServer extends TestCase {
+    private ToolsPigServer pig = null;
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    private File stdOutRedirectedFile;
+
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        pig = new ToolsPigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        stdOutRedirectedFile = new File("stdout.redirected");
+        // Create file if it does not exist
+        try {
+            if(!stdOutRedirectedFile.createNewFile())
+                fail("Unable to create input files");
+        } catch (IOException e) {
+            fail("Unable to create input files:" + e.getMessage());
+        }
+    }
+    
+    @After
+    @Override
+    public void tearDown() throws Exception{
+        pig = null;
+        stdOutRedirectedFile.delete();
+    }
+    
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+    
+    @Test
+    public void testToolsPigServer() throws Exception {
+        Util.createInputFile(cluster, "input", new String[] {
+                "abc\t123",
+                "def\t456",
+                "ghi\t789"});
+
+        File scriptFile = Util.createFile(new String[]{
+                "a = load 'input' using " + PigStorage.class.getName() + "('\t');",
+                "b = foreach a generate $0;",
+                "store b into '/bla';"
+                });
+        pig.registerNoRun(scriptFile.getAbsolutePath(), null, null);
+
+        ToolsPigServer.PigPlans plans = pig.getPlans();
+
+        assertNotNull(plans);
+        assertNotNull(plans.lp);
+
+        List<ExecJob> jobs = pig.runPlan(plans.lp, "testToolsPigServer");
+
+        assertEquals(1, jobs.size());
+        ExecJob job = jobs.get(0);
+        assertEquals(JOB_STATUS.COMPLETED, job.getStatus());
+        Iterator<Tuple> results = job.getResults();
+        assertTrue(results.hasNext()); // should have one record
+        Tuple t = results.next();
+        assertEquals("abc", t.get(0).toString());
+        assertTrue(results.hasNext());
+    }
+
+    @Test
+    public void testToolsPigServerRegister() throws Exception {
+        Util.createLocalInputFile("testRegisterScripts.jar",
+            new String[]{"mary had a little lamb"});
+
+        Util.createInputFile(cluster, "input1", new String[] {
+                "abc:123",
+                "def:456",
+                "ghi:789"});
+
+        File scriptFile = Util.createFile(new String[]{
+                "register 'testRegisterScripts.jar';",
+                "a = load 'input1' using " + PigStorage.class.getName() + "(':');",
+
+                "b = foreach a generate $0;",
+                "store b into '/bla1';"
+                });
+        pig.registerNoRun(scriptFile.getAbsolutePath(), null, null);
+
+        ToolsPigServer.PigPlans plans = pig.getPlans();
+
+        assertNotNull(plans);
+        assertNotNull(plans.lp);
+
+        List<ExecJob> jobs = pig.runPlan(plans.lp, "testToolsPigServer");
+
+        assertEquals(1, jobs.size());
+        ExecJob job = jobs.get(0);
+        assertEquals(JOB_STATUS.COMPLETED, job.getStatus());
+        Iterator<Tuple> results = job.getResults();
+        assertTrue(results.hasNext()); // should have one record
+        Tuple t = results.next();
+        assertEquals("abc", t.get(0).toString());
+        assertTrue(results.hasNext());
+    }
+
+
+}