You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/04/13 02:10:08 UTC
svn commit: r1091615 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/PigServer.java
src/org/apache/pig/tools/ToolsPigServer.java
src/org/apache/pig/tools/grunt/GruntParser.java
test/org/apache/pig/test/TestToolsPigServer.java
Author: gates
Date: Wed Apr 13 00:10:08 2011
New Revision: 1091615
URL: http://svn.apache.org/viewvc?rev=1091615&view=rev
Log:
PIG-1881 Need a special interface for Penny (Inspector Gadget)
Added:
pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1091615&r1=1091614&r2=1091615&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 13 00:10:08 2011
@@ -32,6 +32,9 @@ PIG-1876: Typed map for Pig (daijy)
IMPROVEMENTS
+PIG-1881: Need a special interface for Penny (Inspector Gadget) (laukik via
+gates)
+
PIG-1947: Incorrect line number is reported during parsing(xuefu)
PIG1918: Line number should be give for logical plan failures (xuefu)
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1091615&r1=1091614&r2=1091615&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 13 00:10:08 2011
@@ -44,6 +44,8 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.Assert;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
@@ -122,7 +124,7 @@ import org.apache.pig.tools.pigstats.Pig
@InterfaceStability.Stable
public class PigServer {
- private final Log log = LogFactory.getLog(getClass());
+ protected final Log log = LogFactory.getLog(getClass());
/**
* Given a string, determine the exec type.
@@ -161,14 +163,14 @@ public class PigServer {
* on a new graph. After the nested script is done, the grunt
* shell pops up the saved graph and continues working on it.
*/
- private final Stack<Graph> graphs = new Stack<Graph>();
+ protected final Stack<Graph> graphs = new Stack<Graph>();
/*
* The current Graph the grunt shell is working on.
*/
private Graph currDAG;
- private final PigContext pigContext;
+ protected final PigContext pigContext;
private String jobName;
@@ -176,7 +178,7 @@ public class PigServer {
private final static AtomicInteger scopeCounter = new AtomicInteger(0);
- private final String scope = constructScope();
+ protected final String scope = constructScope();
private boolean isMultiQuery = true;
@@ -366,7 +368,15 @@ public class PigServer {
stats = execute();
}
-
+ return getJobs(stats);
+ }
+
+ /**
+ * Retrieves a list of Job objects from the PigStats object
+ * @param stats
+ * @return A list of ExecJob objects
+ */
+ protected List<ExecJob> getJobs(PigStats stats) {
LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
JobGraph jGraph = stats.getJobGraph();
Iterator<JobStats> iter = jGraph.iterator();
@@ -649,15 +659,38 @@ public class PigServer {
*/
public void registerScript(InputStream in, Map<String,String> params,List<String> paramsFiles) throws IOException {
try {
- // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
- List<String> paramList = new ArrayList<String>();
- if (params!=null){
- for (Map.Entry<String, String> entry:params.entrySet()){
- paramList.add(entry.getKey()+"="+entry.getValue());
- }
- }
+ String substituted = doParamSubstitution(in, params, paramsFiles);
+ GruntParser grunt = new GruntParser(new StringReader(substituted));
+ grunt.setInteractive(false);
+ grunt.setParams(this);
+ grunt.parseStopOnError(true);
+ } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
+ log.error(e.getLocalizedMessage());
+ throw new IOException(e.getCause());
+ }
+ }
- // do parameter substitution
+ /**
+ * Do parameter substitution.
+ * @param in The InputStream of file containing Pig Latin to do substitution on.
+ * @param params Parameters to use to substitute
+ * @param paramsFiles Files to use to do substitution.
+ * @return String containing Pig Latin with substitutions done
+ * @throws IOException
+ */
+ protected String doParamSubstitution(InputStream in,
+ Map<String,String> params,
+ List<String> paramsFiles) throws IOException {
+ // transform the map type to list type which can been accepted by ParameterSubstitutionPreprocessor
+ List<String> paramList = new ArrayList<String>();
+ if (params != null) {
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ paramList.add(entry.getKey() + "=" + entry.getValue());
+ }
+ }
+
+ // do parameter substitution
+ try {
ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
StringWriter writer = new StringWriter();
psp.genSubstitutedFile(new BufferedReader(new InputStreamReader(in)),
@@ -665,13 +698,7 @@ public class PigServer {
paramList.size() > 0 ? paramList.toArray(new String[0]) : null,
paramsFiles!=null ? paramsFiles.toArray(new String[0]) : null);
- GruntParser grunt = new GruntParser(new StringReader(writer.toString()));
- grunt.setInteractive(false);
- grunt.setParams(this);
- grunt.parseStopOnError(true);
- } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
- log.error(e.getLocalizedMessage());
- throw new IOException(e.getCause());
+ return writer.toString();
} catch (org.apache.pig.tools.parameters.ParseException e) {
log.error(e.getLocalizedMessage());
throw new IOException(e.getCause());
@@ -679,6 +706,23 @@ public class PigServer {
}
/**
+ * Creates a clone of the current DAG
+ * @return A Graph object which is a clone of the current DAG
+ * @throws IOException
+ */
+ protected Graph getClonedGraph() throws IOException {
+ Graph graph = currDAG.duplicate();
+
+ if (graph == null) {
+ int errCode = 2127;
+ String msg = "Cloning of plan failed.";
+ throw new FrontendException(msg, errCode, PigException.BUG);
+ }
+ return graph;
+ }
+
+
+ /**
* Register a query with the Pig runtime. The query will be read from the indicated file.
* @param fileName file to read query from.
* @throws IOException
@@ -1275,10 +1319,22 @@ public class PigServer {
ScriptState.get().setScriptFeatures( currDAG.lp );
PhysicalPlan pp = compilePp();
+ return launchPlan(pp, "job_pigexec_");
+ }
+
+ /**
+ * A common method for launching the jobs according to the physical plan
+ * @param pp The physical plan
+ * @param jobName A String containing the job name to be used
+ * @return The PigStats object
+ * @throws ExecException
+ * @throws FrontendException
+ */
+ protected PigStats launchPlan(PhysicalPlan pp, String jobName) throws ExecException, FrontendException {
MapReduceLauncher launcher = new MapReduceLauncher();
PigStats stats = null;
try {
- stats = launcher.launchPig(pp, "job_pigexec_", pigContext);
+ stats = launcher.launchPig(pp, jobName, pigContext);
} catch (Exception e) {
// There are a lot of exceptions thrown by the launcher. If this
// is an ExecException, just let it through. Else wrap it.
@@ -1342,7 +1398,7 @@ public class PigServer {
/*
* This class holds the internal states of a grunt shell session.
*/
- private class Graph {
+ protected class Graph {
private final Map<LogicalRelationalOperator, LogicalPlan> aliases = new HashMap<LogicalRelationalOperator, LogicalPlan>();
@@ -1364,7 +1420,7 @@ public class PigServer {
private int currentLineNum = 0;
- Graph(boolean batchMode) {
+ public Graph(boolean batchMode) {
this.batchMode = batchMode;
this.lp = new LogicalPlan();
};
@@ -1411,6 +1467,22 @@ public class PigServer {
return operators.get( alias );
}
+ public LogicalPlan getPlan(String alias) throws IOException {
+ LogicalPlan plan = lp;
+
+ if (alias != null) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator) operators.get(alias);
+ if(op == null) {
+ int errCode = 1003;
+ String msg = "Unable to find an operator for alias " + alias;
+ throw new FrontendException(msg, errCode, PigException.INPUT);
+ }
+ plan = aliases.get(op);
+ }
+ return plan;
+ }
+
+
/**
* Build a plan for the given alias. Extra branches and child branch under alias
* will be ignored. Dependent branch (i.e. scalar) will be kept.
@@ -1669,5 +1741,48 @@ public class PigServer {
}
}
}
+
+
+ protected Graph duplicate() {
+ // There are two choices on how we duplicate the logical plan
+ // 1 - we really clone each operator and connect up the cloned operators
+ // 2 - we cache away the script till the point we need to clone
+ // and then simply re-parse the script.
+ // The latter approach is used here
+ // FIXME: There is one open issue with this now:
+ // Consider the following script:
+ // A = load 'file:/somefile';
+ // B = filter A by $0 > 10;
+ // store B into 'bla';
+ // rm 'file:/somefile';
+ // A = load 'file:/someotherfile'
+ // when we try to clone - we try to reparse
+ // from the beginning and currently the parser
+ // checks for file existence of files in the load
+ // in the case where the file is a local one -i.e. with file: prefix
+ // This will be a known issue now and we will need to revisit later
+
+ // parse each line of the cached script
+ int lineNumber = 1;
+
+ // create data structures needed for parsing
+ Graph graph = new Graph(isBatchOn());
+ graph.processedStores = processedStores;
+ graph.fileNameMap = new HashMap<String, String>(fileNameMap);
+
+ try {
+ for (Iterator<String> it = scriptCache.iterator(); it.hasNext(); lineNumber++) {
+ // always doing registerQuery irrespective of the batch mode
+ // TODO: Need to figure out if anything different needs to happen if batch
+ // mode is not on
+ graph.registerQuery(it.next(), lineNumber);
+ }
+ graph.postProcess();
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ graph = null;
+ }
+ return graph;
+ }
}
}
Added: pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java?rev=1091615&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java (added)
+++ pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java Wed Apr 13 00:10:08 2011
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.tools.grunt.GruntParser;
+import org.apache.pig.tools.pigstats.PigStats;
+
+/**
+ * ToolsPigServer is a subclass of PigServer intended only for Pig tools. Users
+ * should not use this interface, as we make no promises about its stability or
+ * continued existence.
+ */
+@InterfaceAudience.LimitedPrivate({"Penny"})
+@InterfaceStability.Unstable
+public class ToolsPigServer extends PigServer {
+
+ private PigPlans plans = null;
+
+ /**
+ * @param execTypeString can be 'mapreduce' or 'local'. Local mode will
+ * use Hadoop's local job runner to execute the job on the local machine.
+ * Mapreduce mode will connect to a cluster to execute the job.
+ * @throws ExecException if throws by PigServer
+ * @throws IOException if throws by PigServer
+ */
+ public ToolsPigServer(String execTypeString) throws ExecException, IOException {
+ super(execTypeString);
+ }
+
+ /**
+ * @param execType execution type to start the engine in.
+ * @param properties to use for this run
+ * @throws ExecException if throws by PigServer
+ */
+ public ToolsPigServer(ExecType execType, Properties properties) throws ExecException {
+ super(execType, properties);
+ }
+
+ /**
+ * Register a script without running it. This method is not compatible with
+ * {@link #registerQuery(String)}, {@link #registerScript(String)},
+ * {@link #store(String, String)} or {@link #openIterator(String)}. It can be
+ * used with {@link #getPlans()} and {@link #runPlan(LogicalPlan, String)} in this class
+ * only. The proper control flow is for the caller to call registerNoRun() and then
+ * getPlans() to get a copy of the plans. The user can then modify the
+ * logical plan. It can then be returned via runPlan(), which will execute
+ * the plan.
+ * @param fileName File containing Pig Latin script to register.
+ * @param params the key is the parameter name, and the value is the parameter value
+ * @param paramFiles files which have the parameter setting
+ * @throws IOException if it encounters problems reading the script
+ * @throws FrontendException if it encounters problems parsing the script
+ */
+ public void registerNoRun(String fileName,
+ Map<String, String> params,
+ List<String> paramFiles) throws IOException, FrontendException {
+
+ // Do parameter substitution
+ String substituted = null;
+ FileInputStream fis = null;
+ try{
+ fis = new FileInputStream(fileName);
+ substituted = doParamSubstitution(fis, params, paramFiles);
+ }catch (FileNotFoundException e){
+ log.error(e.getLocalizedMessage());
+ throw new IOException(e.getCause());
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+
+ // Parse in grunt so that register commands are recognized
+ try {
+ GruntParser grunt = new GruntParser(new StringReader(substituted));
+ grunt.setInteractive(false);
+ grunt.setParams(this);
+ setBatchOn();
+ //grunt.setLoadOnly(true);
+ grunt.parseOnly();
+ } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
+ log.error(e.getLocalizedMessage());
+ throw new IOException(e.getCause());
+ }
+
+ Graph g = getClonedGraph();
+ LogicalPlan lp = g.getPlan(null);
+ plans = new PigPlans(lp);
+ }
+
+ /**
+ * Get a class containing the Pig plans. For now it just contains
+ * the new logical plan. At some point in the future it should contain
+ * the MR plan as well.
+ * @return the pig plans.
+ */
+ public PigPlans getPlans() {
+ return plans;
+ }
+
+ /**
+ * Given a (modified) new logical plan, run the script.
+ * @param newPlan plan to run
+ * @param jobName name to give the MR jobs associated with this run
+ * @return list of exec jobs describing the jobs that were run.
+ * @throws FrontendException if plan translation fails.
+ * @throws ExecException if running the job fails.
+ */
+ public List<ExecJob> runPlan(LogicalPlan newPlan,
+ String jobName) throws FrontendException, ExecException {
+
+ HExecutionEngine engine = new HExecutionEngine(pigContext);
+ PhysicalPlan pp = engine.compile(newPlan, null);
+ PigStats stats = launchPlan(pp, jobName);
+ return getJobs(stats);
+ }
+
+ public static class PigPlans {
+
+ public LogicalPlan lp;
+
+ public PigPlans(LogicalPlan lp) {
+ this.lp = lp;
+ }
+ };
+}
+
+
+
+
Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1091615&r1=1091614&r2=1091615&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Wed Apr 13 00:10:08 2011
@@ -176,14 +176,13 @@ public class GruntParser extends PigScri
parse();
}
- if (!sameBatch) {
- executeBatch();
- }
- }
- finally {
- if (!sameBatch) {
- discardBatch();
- }
+ if (!sameBatch) {
+ executeBatch();
+ }
+ } finally {
+ if (!sameBatch) {
+ discardBatch();
+ }
}
int [] res = { mNumSucceededJobs, mNumFailedJobs };
return res;
@@ -234,6 +233,20 @@ public class GruntParser extends PigScri
public boolean isDone() {
return mDone;
}
+
+ /*
+ * parseOnly method added for supporting penny
+ */
+ public void parseOnly() throws IOException, ParseException {
+ if (mPigServer == null) {
+ throw new IllegalStateException();
+ }
+
+ mDone = false;
+ while(!mDone) {
+ parse();
+ }
+ }
@Override
protected void processDescribe(String alias) throws IOException {
Added: pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java?rev=1091615&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestToolsPigServer.java Wed Apr 13 00:10:08 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.ToolsPigServer;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestToolsPigServer extends TestCase {
+ private ToolsPigServer pig = null;
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ private File stdOutRedirectedFile;
+
+ @Before
+ @Override
+ public void setUp() throws Exception{
+ pig = new ToolsPigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ stdOutRedirectedFile = new File("stdout.redirected");
+ // Create file if it does not exist
+ try {
+ if(!stdOutRedirectedFile.createNewFile())
+ fail("Unable to create input files");
+ } catch (IOException e) {
+ fail("Unable to create input files:" + e.getMessage());
+ }
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception{
+ pig = null;
+ stdOutRedirectedFile.delete();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testToolsPigServer() throws Exception {
+ Util.createInputFile(cluster, "input", new String[] {
+ "abc\t123",
+ "def\t456",
+ "ghi\t789"});
+
+ File scriptFile = Util.createFile(new String[]{
+ "a = load 'input' using " + PigStorage.class.getName() + "('\t');",
+ "b = foreach a generate $0;",
+ "store b into '/bla';"
+ });
+ pig.registerNoRun(scriptFile.getAbsolutePath(), null, null);
+
+ ToolsPigServer.PigPlans plans = pig.getPlans();
+
+ assertNotNull(plans);
+ assertNotNull(plans.lp);
+
+ List<ExecJob> jobs = pig.runPlan(plans.lp, "testToolsPigServer");
+
+ assertEquals(1, jobs.size());
+ ExecJob job = jobs.get(0);
+ assertEquals(JOB_STATUS.COMPLETED, job.getStatus());
+ Iterator<Tuple> results = job.getResults();
+ assertTrue(results.hasNext()); // should have one record
+ Tuple t = results.next();
+ assertEquals("abc", t.get(0).toString());
+ assertTrue(results.hasNext());
+ }
+
+ @Test
+ public void testToolsPigServerRegister() throws Exception {
+ Util.createLocalInputFile("testRegisterScripts.jar",
+ new String[]{"mary had a little lamb"});
+
+ Util.createInputFile(cluster, "input1", new String[] {
+ "abc:123",
+ "def:456",
+ "ghi:789"});
+
+ File scriptFile = Util.createFile(new String[]{
+ "register 'testRegisterScripts.jar';",
+ "a = load 'input1' using " + PigStorage.class.getName() + "(':');",
+
+ "b = foreach a generate $0;",
+ "store b into '/bla1';"
+ });
+ pig.registerNoRun(scriptFile.getAbsolutePath(), null, null);
+
+ ToolsPigServer.PigPlans plans = pig.getPlans();
+
+ assertNotNull(plans);
+ assertNotNull(plans.lp);
+
+ List<ExecJob> jobs = pig.runPlan(plans.lp, "testToolsPigServer");
+
+ assertEquals(1, jobs.size());
+ ExecJob job = jobs.get(0);
+ assertEquals(JOB_STATUS.COMPLETED, job.getStatus());
+ Iterator<Tuple> results = job.getResults();
+ assertTrue(results.hasNext()); // should have one record
+ Tuple t = results.next();
+ assertEquals("abc", t.get(0).toString());
+ assertTrue(results.hasNext());
+ }
+
+
+}