You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/02/25 06:18:48 UTC
svn commit: r747660 [1/2] - in /hadoop/pig/branches/multiquery: ./
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/...
Author: olga
Date: Wed Feb 25 05:18:47 2009
New Revision: 747660
URL: http://svn.apache.org/viewvc?rev=747660&view=rev
Log:
PIG-627: multiquery support M1 (hagleitn via olgan)
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/PlanDumper.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd2
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test.ppf
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test_broken.ppf
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsub.pig
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_exec.pig
hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_run.pig
Modified:
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/OperatorPlan.java
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestPigScriptParser.java
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Feb 25 05:18:47 2009
@@ -406,3 +406,5 @@
storing strings > 65536 bytes (in UTF8 form) using BinStorage() (sms)
PIG-642: Limit after FRJ causes problems (daijy)
+
+ PIG-627: multiquery support M1 (hagleitn via olgan)
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Wed Feb 25 05:18:47 2009
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,7 +52,6 @@
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
import org.apache.pig.impl.logicalLayer.PlanSetter;
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -95,23 +95,38 @@
throw new PigException(msg, errCode, PigException.BUG);
}
-
- Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
- Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
- Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
- PigContext pigContext;
+ /*
+ * The data structure to support grunt shell operations.
+ * The grunt shell can only work on one graph at a time.
+ * If a script is contained inside another script, the grunt
+ * shell first saves the current graph on the stack and works
+ * on a new graph. After the nested script is done, the grunt
+ * shell pops up the saved graph and continues working on it.
+ */
+ private Stack<Graph> graphs = new Stack<Graph>();
+
+ /*
+ * The current Graph the grunt shell is working on.
+ */
+ private Graph currDAG;
+
+ private PigContext pigContext;
+ private static int scopeCounter = 0;
private String scope = constructScope();
- private ArrayList<String> cachedScript = new ArrayList<String>();
-
+
private String constructScope() {
// scope servers for now as a session id
- // scope = user_id + "-" + time_stamp;
- String user = System.getProperty("user.name", "DEFAULT_USER_ID");
- String date = (new Date()).toString();
-
- return user + "-" + date;
+ // String user = System.getProperty("user.name", "DEFAULT_USER_ID");
+ // String date = (new Date()).toString();
+
+ // scope is not really used in the system right now. It will
+ // however make your explain statements look lengthy if set to
+ // username-date. For now let's simplify the scope, if a real
+ // scope is needed again, we might need to update all the
+ // operators to not include scope in their name().
+ return ""+(++scopeCounter);
}
public PigServer(String execTypeString) throws ExecException, IOException {
@@ -123,21 +138,15 @@
}
public PigServer(ExecType execType, Properties properties) throws ExecException {
- this.pigContext = new PigContext(execType, properties);
- if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
- setJobName("DefaultJobName") ;
- }
- pigContext.connect();
+ this(new PigContext(execType, properties));
}
-
+
public PigServer(PigContext context) throws ExecException {
this.pigContext = context;
- if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
- setJobName("DefaultJobName") ;
- }
+ currDAG = new Graph(false);
pigContext.connect();
}
-
+
public PigContext getPigContext(){
return pigContext;
}
@@ -149,7 +158,61 @@
public void debugOff() {
pigContext.debug = false;
}
-
+
+ /**
+ * Starts batch execution mode.
+ */
+ public void setBatchOn() {
+ log.info("Create a new graph.");
+
+ if (currDAG != null) {
+ graphs.push(currDAG);
+ }
+ currDAG = new Graph(true);
+ }
+
+ /**
+ * Retrieve the current execution mode.
+ *
+ * @return true if the execution mode is batch; false otherwise.
+ */
+ public boolean isBatchOn() {
+ return currDAG.isBatchOn();
+ }
+
+ /**
+ * Submits a batch of Pig commands for execution.
+ *
+ * @throws FrontendException
+ * @throws ExecException
+ */
+ public void executeBatch() throws FrontendException, ExecException {
+ if (currDAG == null || !isBatchOn() || graphs.size() < 1) {
+ throw new IllegalStateException("setBatchOn() must be called first.");
+ }
+
+ try {
+ currDAG.execute();
+ } finally {
+ log.info("Delete the current graph.");
+ currDAG = graphs.pop();
+ }
+ }
+
+ /**
+ * Discards a batch of Pig commands.
+ *
+ * @throws FrontendException
+ * @throws ExecException
+ */
+ public void discardBatch() throws FrontendException {
+ if (currDAG == null || !isBatchOn() || graphs.size() < 1) {
+ throw new IllegalStateException("setBatchOn() must be called first.");
+ }
+
+ currDAG = graphs.pop();
+ }
+
/**
* Add a path to be skipped while automatically shipping binaries for
* streaming.
@@ -264,53 +327,10 @@
* line number of the query within the whold script
* @throws IOException
*/
- public void registerQuery(String query, int startLine) throws IOException {
-
- LogicalPlan lp = parseQuery(query, startLine, aliases, opTable, aliasOp);
- // store away the query for use in cloning later
- cachedScript .add(query);
-
- if (lp.getLeaves().size() == 1)
- {
- LogicalOperator op = lp.getSingleLeafPlanOutputOp();
- // No need to do anything about DEFINE
- if (op instanceof LODefine) {
- return;
- }
-
- // Check if we just processed a LOStore i.e. STORE
- if (op instanceof LOStore) {
- try{
- execute(null);
- } catch (Exception e) {
- int errCode = 1002;
- String msg = "Unable to store alias " + op.getOperatorKey().getId();
- throw new FrontendException(msg, errCode, PigException.INPUT, e);
- }
- }
- }
+ public void registerQuery(String query, int startLine) throws IOException {
+ currDAG.registerQuery(query, startLine);
}
-
- private LogicalPlan parseQuery(String query, int startLine, Map<LogicalOperator, LogicalPlan> aliasesMap,
- Map<OperatorKey, LogicalOperator> opTableMap, Map<String, LogicalOperator> aliasOpMap) throws IOException {
- if(query != null) {
- query = query.trim();
- if(query.length() == 0) return null;
- }else {
- return null;
- }
- try {
- return new LogicalPlanBuilder(pigContext).parse(scope, query,
- aliasesMap, opTableMap, aliasOpMap, startLine);
- } catch (ParseException e) {
- //throw (IOException) new IOException(e.getMessage()).initCause(e);
- PigException pe = Utils.getPigException(e);
- int errCode = 1000;
- String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
- throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
- }
- }
-
+
public LogicalPlan clonePlan(String alias) throws IOException {
// There are two choices on how we clone the logical plan
// 1 - we really clone each operator and connect up the cloned operators
@@ -334,28 +354,39 @@
// final logical plan is the clone that we want
LogicalPlan lp = null;
int lineNumber = 1;
- // create data structures needed for parsing
- Map<LogicalOperator, LogicalPlan> cloneAliases = new HashMap<LogicalOperator, LogicalPlan>();
- Map<OperatorKey, LogicalOperator> cloneOpTable = new HashMap<OperatorKey, LogicalOperator>();
- Map<String, LogicalOperator> cloneAliasOp = new HashMap<String, LogicalOperator>();
- for (Iterator<String> it = cachedScript.iterator(); it.hasNext(); lineNumber++) {
- lp = parseQuery(it.next(), lineNumber, cloneAliases, cloneOpTable, cloneAliasOp);
+
+ // create data structures needed for parsing
+ Graph graph = new Graph(true);
+
+ for (Iterator<String> it = currDAG.getScriptCache().iterator(); it.hasNext(); lineNumber++) {
+ if (isBatchOn()) {
+ graph.registerQuery(it.next(), lineNumber);
+ } else {
+ lp = graph.parseQuery(it.next(), lineNumber);
+ }
}
if(alias == null) {
// a store prompted the execution - so return
// the entire logical plan
+ if (isBatchOn()) {
+ lp = new LogicalPlan();
+ for (LogicalPlan lpPart : graph.getStoreOpTable().values()) {
+ lp.mergeSharedPlan(lpPart);
+ }
+ }
+
return lp;
} else {
// return the logical plan corresponding to the
// alias supplied
- LogicalOperator op = cloneAliasOp.get(alias);
+ LogicalOperator op = graph.getAliasOp().get(alias);
if(op == null) {
int errCode = 1003;
String msg = "Unable to find an operator for alias " + alias;
throw new FrontendException(msg, errCode, PigException.INPUT);
}
- return cloneAliases.get(op);
+ return graph.getAliases().get(op);
}
}
@@ -396,7 +427,7 @@
}
public void setJobName(String name){
- pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
+ currDAG.setJobName(name);
}
/**
@@ -404,15 +435,30 @@
* result
*/
public Iterator<Tuple> openIterator(String id) throws IOException {
+ if (isBatchOn()) {
+ log.info("Skip DUMP command in batch mode.");
+ return new Iterator<Tuple>() {
+ public boolean hasNext() {
+ return false;
+ }
+ public Tuple next() {
+ return null;
+ }
+ public void remove() {
+ }
+ };
+ }
+
try {
- LogicalOperator op = aliasOp.get(id);
+ LogicalOperator op = currDAG.getAliasOp().get(id);
if(null == op) {
int errCode = 1003;
String msg = "Unable to find an operator for alias " + id;
throw new FrontendException(msg, errCode, PigException.INPUT);
}
-// ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+
ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
+
// invocation of "execute" is synchronous!
if (job.getStatus() == JOB_STATUS.COMPLETED) {
return job.getResults();
@@ -445,7 +491,7 @@
String id,
String filename,
String func) throws IOException{
- if (!aliasOp.containsKey(id))
+ if (!currDAG.getAliasOp().containsKey(id))
throw new IOException("Invalid alias: " + id);
try {
@@ -502,38 +548,65 @@
*/
public void explain(String alias,
PrintStream stream) throws IOException {
+ explain(alias, "text", true, stream, stream, stream);
+ }
+
+ /**
+ * Provide information on how a pig query will be executed.
+ * @param alias Name of alias to explain.
+ * @param format Format in which the explain should be printed
+ * @param verbose Controls the amount of information printed
+ * @param dir Directory to print the differnt plans into
+ * @throws IOException if the requested alias cannot be found.
+ */
+ public void explain(String alias,
+ String format,
+ boolean verbose,
+ String dir) throws IOException {
try {
- LogicalPlan lp = compileLp(alias);
+ PrintStream lps = new PrintStream(new File(dir,"logical_plan."+format));
+ PrintStream pps = new PrintStream(new File(dir,"physical_plan."+format));
+ PrintStream eps = new PrintStream(new File(dir,"exec_plan."+format));
+ explain(alias, format, verbose, lps, pps, eps);
+ lps.close();
+ pps.close();
+ eps.close();
- // MRCompiler needs a store to be the leaf - hence
- // add a store to the plan to explain
-
- // figure out the leaf to which the store needs to be added
- List<LogicalOperator> leaves = lp.getLeaves();
- LogicalOperator leaf = null;
- if(leaves.size() == 1) {
- leaf = leaves.get(0);
- } else {
- for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
- LogicalOperator leafOp = it.next();
- if(leafOp.getAlias().equals(alias))
- leaf = leafOp;
- }
- }
-
- LogicalPlan storePlan = QueryParser.generateStorePlan(
- scope, lp, "fakefile", PigStorage.class.getName(), leaf);
- stream.println("Logical Plan:");
- LOPrinter lv = new LOPrinter(stream, storePlan);
- lv.visit();
-
- PhysicalPlan pp = compilePp(storePlan);
- stream.println("-----------------------------------------------");
- stream.println("Physical Plan:");
+ } catch (Exception e) {
+ int errCode = 1067;
+ String msg = "Unable to explain alias " + alias;
+ throw new FrontendException(msg, errCode, PigException.INPUT, e);
+ }
+ }
- stream.println("-----------------------------------------------");
- pigContext.getExecutionEngine().explain(pp, stream);
-
+ /**
+ * Provide information on how a pig query will be executed.
+ * @param alias Name of alias to explain.
+ * @param format Format in which the explain should be printed
+ * @param verbose Controls the amount of information printed
+ * @param lps Stream to print the logical tree
+ * @param lps Stream to print the physical tree
+ * @param lps Stream to print the execution tree
+ * @throws IOException if the requested alias cannot be found.
+ */
+ public void explain(String alias,
+ String format,
+ boolean verbose,
+ PrintStream lps,
+ PrintStream pps,
+ PrintStream eps) throws IOException {
+ try {
+ LogicalPlan lp = getStorePlan(alias);
+ if (lp.size() == 0) {
+ lps.println("Logical plan is empty.");
+ pps.println("Physical plan is empty.");
+ eps.println("Execution plan is empty.");
+ return;
+ }
+ PhysicalPlan pp = compilePp(lp);
+ lp.explain(lps, format, verbose);
+ pp.explain(pps, format, verbose);
+ pigContext.getExecutionEngine().explain(pp, eps, format, verbose);
} catch (Exception e) {
int errCode = 1067;
String msg = "Unable to explain alias " + alias;
@@ -633,10 +706,10 @@
public Map<String, LogicalPlan> getAliases() {
Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
- for(LogicalOperator op: this.aliases.keySet()) {
+ for(LogicalOperator op: currDAG.getAliases().keySet()) {
String alias = op.getAlias();
if(null != alias) {
- aliasPlans.put(alias, this.aliases.get(op));
+ aliasPlans.put(alias, currDAG.getAliases().get(op));
}
}
return aliasPlans;
@@ -652,7 +725,6 @@
}
public Map<LogicalOperator, DataBag> getExamples(String alias) {
- //LogicalPlan plan = aliases.get(aliasOp.get(alias));
LogicalPlan plan = null;
try {
plan = clonePlan(alias);
@@ -665,15 +737,48 @@
ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
return exgen.getExamples();
}
+
+ private LogicalPlan getStorePlan(String alias) throws IOException {
+ LogicalPlan lp = compileLp(alias);
+
+ if (!isBatchOn() || alias != null) {
+ // MRCompiler needs a store to be the leaf - hence
+ // add a store to the plan to explain
+
+ // figure out the leaves to which stores need to be added
+ List<LogicalOperator> leaves = lp.getLeaves();
+ LogicalOperator leaf = null;
+ if(leaves.size() == 1) {
+ leaf = leaves.get(0);
+ } else {
+ for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
+ LogicalOperator leafOp = it.next();
+ if(leafOp.getAlias().equals(alias))
+ leaf = leafOp;
+ }
+ }
+
+ lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
+ PigStorage.class.getName(), leaf);
+ }
+
+ return lp;
+ }
private ExecJob execute(String alias) throws FrontendException, ExecException {
- ExecJob job = null;
-// lp.explain(System.out, System.err);
LogicalPlan typeCheckedLp = compileLp(alias);
-
+
+ if (typeCheckedLp.size() == 0) {
+ return null;
+ }
+
+ LogicalOperator op = typeCheckedLp.getLeaves().get(0);
+ if (op instanceof LODefine) {
+ log.info("Skip execution of DEFINE only logical plan.");
+ return null;
+ }
+
return executeCompiledLogicalPlan(typeCheckedLp);
-// typeCheckedLp.explain(System.out, System.err);
-
}
private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
@@ -698,14 +803,14 @@
// create a clone of the logical plan and give it
// to the operations below
LogicalPlan lpClone;
+
try {
- lpClone = clonePlan(alias);
+ lpClone = clonePlan(alias);
} catch (IOException e) {
int errCode = 2001;
String msg = "Unable to clone plan before compiling";
throw new FrontendException(msg, errCode, PigException.BUG, e);
}
-
// Set the logical plan values correctly in all the operators
PlanSetter ps = new PlanSetter(lpClone);
@@ -777,13 +882,13 @@
private LogicalPlan getPlanFromAlias(
String alias,
String operation) throws FrontendException {
- LogicalOperator lo = aliasOp.get(alias);
+ LogicalOperator lo = currDAG.getAliasOp().get(alias);
if (lo == null) {
int errCode = 1004;
String msg = "No alias " + alias + " to " + operation;
throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
}
- LogicalPlan lp = aliases.get(lo);
+ LogicalPlan lp = currDAG.getAliases().get(lo);
if (lp == null) {
int errCode = 1005;
String msg = "No plan for " + alias + " to " + operation;
@@ -792,5 +897,101 @@
return lp;
}
+ /*
+ * This class holds the internal states of a grunt shell session.
+ */
+ private class Graph {
+
+ private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+
+ private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
+
+ private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+
+ private ArrayList<String> scriptCache = new ArrayList<String>();
+
+ private Map<LogicalOperator, LogicalPlan> storeOpTable = new HashMap<LogicalOperator, LogicalPlan>();
+
+ private String jobName;
+
+ private boolean batchMode;
+
+
+ Graph(boolean batchMode) {
+ this.batchMode = batchMode;
+ this.jobName = "DefaultJobName";
+ };
+
+
+ Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
+
+ Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
+
+ Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
+
+ ArrayList<String> getScriptCache() { return scriptCache; }
+
+ Map<LogicalOperator, LogicalPlan> getStoreOpTable() { return storeOpTable; }
+
+ boolean isBatchOn() { return batchMode; };
+
+ void execute() throws ExecException, FrontendException {
+ pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + jobName);
+ PigServer.this.execute(null);
+ }
+
+ void setJobName(String name) {
+ jobName = name;
+ }
+ void registerQuery(String query, int startLine) throws IOException {
+
+ LogicalPlan lp = parseQuery(query, startLine);
+
+ // store away the query for use in cloning later
+ scriptCache.add(query);
+ if (lp.getLeaves().size() == 1) {
+ LogicalOperator op = lp.getSingleLeafPlanOutputOp();
+
+ // Check if we just processed a LOStore i.e. STORE
+ if (op instanceof LOStore) {
+
+ if (!batchMode) {
+ try {
+ execute();
+ } catch (Exception e) {
+ int errCode = 1002;
+ String msg = "Unable to store alias "
+ + op.getOperatorKey().getId();
+ throw new FrontendException(msg, errCode,
+ PigException.INPUT, e);
+ }
+ } else {
+ storeOpTable.put(op, lp);
+ }
+
+ }
+ }
+ }
+
+ LogicalPlan parseQuery(String query, int startLine) throws IOException {
+ if (query != null) {
+ query = query.trim();
+ }
+
+ if (query == null || query.length() == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ try {
+ return new LogicalPlanBuilder(PigServer.this.pigContext).parse(scope, query,
+ aliases, opTable, aliasOp, startLine);
+ } catch (ParseException e) {
+ PigException pe = Utils.getPigException(e);
+ int errCode = 1000;
+ String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
+ throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
+ }
+ }
+ }
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Feb 25 05:18:47 2009
@@ -123,8 +123,11 @@
*
* @param plan PhysicalPlan to explain
* @param stream Stream to print output to
+ * @param format Format to print in
+ * @param verbose Amount of information to print
*/
- public void explain(PhysicalPlan plan, PrintStream stream);
+ public void explain(PhysicalPlan plan, PrintStream stream,
+ String format, boolean verbose);
/**
* Return currently running jobs (can be useful for admin purposes)
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Feb 25 05:18:47 2009
@@ -273,16 +273,13 @@
throw new UnsupportedOperationException();
}
- public void explain(PhysicalPlan plan, PrintStream stream) {
+ public void explain(PhysicalPlan plan, PrintStream stream, String format, boolean verbose) {
try {
- PlanPrinter printer = new PlanPrinter(plan, stream);
- printer.visit();
- stream.println();
-
ExecTools.checkLeafIsStore(plan, pigContext);
MapReduceLauncher launcher = new MapReduceLauncher();
- launcher.explain(plan, pigContext, stream);
+ launcher.explain(plan, pigContext, stream, format, verbose);
+
} catch (Exception ve) {
throw new RuntimeException(ve);
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Wed Feb 25 05:18:47 2009
@@ -88,13 +88,17 @@
* @param pp PhysicalPlan to explain
* @param pc PigContext to use for configuration
* @param ps PrintStream to write output on.
+ * @param format Format to write in
+ * @param verbose Amount of information to print
* @throws VisitorException
* @throws IOException
*/
public abstract void explain(
PhysicalPlan pp,
PigContext pc,
- PrintStream ps) throws PlanException,
+ PrintStream ps,
+ String format,
+ boolean verbose) throws PlanException,
VisitorException,
IOException;
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Wed Feb 25 05:18:47 2009
@@ -38,6 +38,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -117,13 +118,22 @@
public void explain(
PhysicalPlan php,
PigContext pc,
- PrintStream ps) throws PlanException, VisitorException,
- IOException {
+ PrintStream ps,
+ String format,
+ boolean verbose) throws PlanException, VisitorException,
+ IOException {
log.trace("Entering LocalLauncher.explain");
MROperPlan mrp = compile(php, pc);
- MRPrinter printer = new MRPrinter(ps, mrp);
- printer.visit();
+ if (format.equals("text")) {
+ MRPrinter printer = new MRPrinter(ps, mrp);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else {
+ DotMRPrinter printer =new DotMRPrinter(mrp, ps);
+ printer.setVerbose(verbose);
+ printer.dump();
+ }
}
private MROperPlan compile(
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 25 05:18:47 2009
@@ -197,15 +197,21 @@
*/
public MROperPlan compile() throws IOException, PlanException, VisitorException {
List<PhysicalOperator> leaves = plan.getLeaves();
- if(!(leaves.get(0) instanceof POStore)) {
- int errCode = 2025;
- String msg = "Expected leaf of reduce plan to " +
- "always be POStore. Found " + leaves.get(0).getClass().getSimpleName();
- throw new MRCompilerException(msg, errCode, PigException.BUG);
- }
- POStore store = (POStore)leaves.get(0);
- FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
- compile(store);
+
+ for (PhysicalOperator op : leaves) {
+ if (!(op instanceof POStore)) {
+ int errCode = 2025;
+ String msg = "Expected leaf of reduce plan to " +
+ "always be POStore. Found " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ for (PhysicalOperator op : leaves) {
+ POStore store = (POStore)op;
+ FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
+ compile(store);
+ }
// I'm quite certain this is not the best way to do this. The issue
// is that for jobs that take multiple map reduce passes, for
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 25 05:18:47 2009
@@ -35,6 +35,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -115,13 +116,26 @@
public void explain(
PhysicalPlan php,
PigContext pc,
- PrintStream ps) throws PlanException, VisitorException,
+ PrintStream ps,
+ String format,
+ boolean verbose) throws PlanException, VisitorException,
IOException {
log.trace("Entering MapReduceLauncher.explain");
MROperPlan mrp = compile(php, pc);
- MRPrinter printer = new MRPrinter(ps, mrp);
- printer.visit();
+ if (format.equals("text")) {
+ MRPrinter printer = new MRPrinter(ps, mrp);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else {
+ ps.println("#--------------------------------------------------");
+ ps.println("# Map Reduce Plan ");
+ ps.println("#--------------------------------------------------");
+
+ DotMRPrinter printer =new DotMRPrinter(mrp, ps);
+ printer.setVerbose(verbose);
+ printer.dump();
+ }
}
private MROperPlan compile(
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Feb 25 05:18:47 2009
@@ -323,4 +323,8 @@
public void setReplFiles(FileSpec[] replFiles) {
this.replFiles = replFiles;
}
+
+ public int getRequestedParallelism() {
+ return requestedParallelism;
+ }
}
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import org.apache.pig.impl.util.MultiMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.DotPOPrinter;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanException;
+
+/**
+ * This class can print an MR plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting in the associated physical plans.
+ */
+public class DotMRPrinter extends DotPlanDumper<MapReduceOper, MROperPlan,
+ DotMRPrinter.InnerOperator,
+ DotMRPrinter.InnerPlan> {
+
+ static int counter = 0;
+ boolean isVerboseNesting = true;
+
+ public DotMRPrinter(MROperPlan plan, PrintStream ps) {
+ this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+ }
+
+ private DotMRPrinter(MROperPlan plan, PrintStream ps, boolean isSubGraph,
+ Set<Operator> subgraphs,
+ Set<Operator> multiInputSubgraphs) {
+ super(plan, ps, isSubGraph, subgraphs, multiInputSubgraphs);
+ }
+
+ @Override
+ public void setVerbose(boolean verbose) {
+ // leave the parents verbose set to true
+ isVerboseNesting = verbose;
+ }
+
+ @Override
+ protected DotPlanDumper makeDumper(InnerPlan plan, PrintStream ps) {
+ return new InnerPrinter(plan, ps, mSubgraphs, mMultiInputSubgraphs);
+ }
+
+ @Override
+ protected String getName(MapReduceOper op) {
+ String name = "Map";
+ if (!op.combinePlan.isEmpty()) {
+ name += " - Combine";
+ }
+ if (!op.reducePlan.isEmpty()) {
+ name += " - Reduce";
+ }
+ if (op.getRequestedParallelism()!=-1) {
+ name += " Parallelism: "+op.getRequestedParallelism();
+ }
+ name += ", Global Sort: "+op.isGlobalSort();
+ return name;
+ }
+
+ @Override
+ protected Collection<InnerPlan> getNestedPlans(MapReduceOper op) {
+ Collection<InnerPlan> plans = new LinkedList<InnerPlan>();
+ plans.add(new InnerPlan(op.mapPlan, op.combinePlan, op.reducePlan));
+ return plans;
+ }
+
+ @Override
+ protected String[] getAttributes(MapReduceOper op) {
+ String[] attributes = new String[3];
+ attributes[0] = "label=\""+getName(op)+"\"";
+ attributes[1] = "style=\"filled\"";
+ attributes[2] = "fillcolor=\"#EEEEEE\"";
+ return attributes;
+ }
+
+
+ /**
+ * Helper class to represent the relationship of map, reduce and
+ * combine phases in an MR operator.
+ */
+ public class InnerOperator extends Operator<PlanVisitor> {
+ String name;
+ PhysicalPlan plan;
+ int code;
+
+ public InnerOperator(PhysicalPlan plan, String name) {
+ super(new OperatorKey());
+ this.name = name;
+ this.plan = plan;
+ this.code = counter++;
+ }
+
+ @Override public void visit(PlanVisitor v) {}
+ @Override public boolean supportsMultipleInputs() {return false;}
+ @Override public boolean supportsMultipleOutputs() {return false;}
+ @Override public String name() {return name;}
+ public PhysicalPlan getPlan() {return plan;}
+ @Override public int hashCode() {return code;}
+ }
+
+ /**
+ * Helper class to represent the relationship of map, reduce and
+ * combine phases in an MR operator. Each MR operator will have
+ * an inner plan of map -> (combine)? -> (reduce)? inner
+ * operators. The inner operators contain the physical plan of the
+ * execution phase.
+ */
+ public class InnerPlan extends OperatorPlan<InnerOperator> {
+ public InnerPlan(PhysicalPlan mapPlan, PhysicalPlan combinePlan,
+ PhysicalPlan reducePlan) {
+ try {
+ InnerOperator map = new InnerOperator(mapPlan, "Map");
+
+ this.add(map);
+ if (!combinePlan.isEmpty()) {
+ InnerOperator combine =
+ new InnerOperator(combinePlan, "Combine");
+ InnerOperator reduce =
+ new InnerOperator(reducePlan, "Reduce");
+ this.add(combine);
+ this.connect(map, combine);
+ this.add(reduce);
+ this.connect(combine, reduce);
+ }
+ else if (!reducePlan.isEmpty()){
+ InnerOperator reduce =
+ new InnerOperator(reducePlan, "Reduce");
+ this.add(reduce);
+ this.connect(map, reduce);
+ }
+ } catch (PlanException e) {}
+ }
+ }
+
+ /**
+ * Helper class to represent the relationship of map, reduce and
+ * combine phases in an MR operator.
+ */
+ private class InnerPrinter extends DotPlanDumper<InnerOperator, InnerPlan,
+ PhysicalOperator, PhysicalPlan> {
+
+ public InnerPrinter(InnerPlan plan, PrintStream ps,
+ Set<Operator> subgraphs,
+ Set<Operator> multiInputSubgraphs) {
+ super(plan, ps, true, subgraphs, multiInputSubgraphs);
+ }
+
+ @Override
+ protected String[] getAttributes(InnerOperator op) {
+ String[] attributes = new String[3];
+ attributes[0] = "label=\""+getName(op)+"\"";
+ attributes[1] = "style=\"filled\"";
+ attributes[2] = "fillcolor=\"white\"";
+ return attributes;
+ }
+
+ @Override
+ protected Collection<PhysicalPlan> getNestedPlans(InnerOperator op) {
+ Collection<PhysicalPlan> l = new LinkedList<PhysicalPlan>();
+ l.add(op.getPlan());
+ return l;
+ }
+
+ @Override
+ protected DotPOPrinter makeDumper(PhysicalPlan plan, PrintStream ps) {
+ DotPOPrinter printer = new DotPOPrinter(plan, ps, true,
+ mSubgraphs,
+ mMultiInputSubgraphs);
+ printer.setVerbose(isVerboseNesting);
+ return printer;
+ }
+ }
+}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Wed Feb 25 05:18:47 2009
@@ -34,6 +34,7 @@
private PrintStream mStream = null;
private int mIndent = 0;
+ private boolean isVerbose = true;
/**
* @param ps PrintStream to output plan information to
@@ -47,24 +48,31 @@
mStream.println("--------------------------------------------------");
}
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
mStream.println("MapReduce node " + mr.getOperatorKey().toString());
if (mr.mapPlan != null && mr.mapPlan.size() > 0) {
mStream.println("Map Plan");
PlanPrinter printer = new PlanPrinter(mr.mapPlan, mStream);
+ printer.setVerbose(isVerbose);
printer.visit();
mStream.println("--------");
}
if (mr.combinePlan != null && mr.combinePlan.size() > 0) {
mStream.println("Combine Plan");
PlanPrinter printer = new PlanPrinter(mr.combinePlan, mStream);
+ printer.setVerbose(isVerbose);
printer.visit();
mStream.println("--------");
}
if (mr.reducePlan != null && mr.reducePlan.size() > 0) {
mStream.println("Reduce Plan");
PlanPrinter printer = new PlanPrinter(mr.reducePlan, mStream);
+ printer.setVerbose(isVerbose);
printer.visit();
mStream.println("--------");
}
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import org.apache.pig.impl.util.MultiMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+
+/**
+ * This class can print a physical plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting.
+ */
+public class DotPOPrinter extends DotPlanDumper<PhysicalOperator, PhysicalPlan,
+ PhysicalOperator, PhysicalPlan> {
+
+ public DotPOPrinter(PhysicalPlan plan, PrintStream ps) {
+ this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+ }
+
+ public DotPOPrinter(PhysicalPlan plan, PrintStream ps, boolean isSubGraph,
+ Set<Operator> subgraphs,
+ Set<Operator> multiInputSubgraphs) {
+ super(plan, ps, isSubGraph, subgraphs, multiInputSubgraphs);
+ }
+
+ @Override
+ protected DotPlanDumper makeDumper(PhysicalPlan plan, PrintStream ps) {
+ return new DotPOPrinter(plan, ps, true, mSubgraphs,
+ mMultiInputSubgraphs);
+ }
+
+ @Override
+ protected String getName(PhysicalOperator op) {
+ return (op.name().split(" - "))[0];
+ }
+
+
+ @Override
+ protected String[] getAttributes(PhysicalOperator op) {
+ if (op instanceof POStore || op instanceof POLoad) {
+ String[] attributes = new String[3];
+ attributes[0] = "label=\""+getName(op).replace(":",",\\n")+"\"";
+ attributes[1] = "style=\"filled\"";
+ attributes[2] = "fillcolor=\"gray\"";
+ return attributes;
+ }
+ else {
+ return super.getAttributes(op);
+ }
+ }
+
+ @Override
+ protected Collection<PhysicalPlan> getNestedPlans(PhysicalOperator op) {
+ Collection<PhysicalPlan> plans = new LinkedList<PhysicalPlan>();
+
+ if(op instanceof POFilter){
+ plans.add(((POFilter)op).getPlan());
+ }
+ else if(op instanceof POForEach){
+ plans.addAll(((POForEach)op).getInputPlans());
+ }
+ else if(op instanceof POSort){
+ plans.addAll(((POSort)op).getSortPlans());
+ }
+ else if(op instanceof POLocalRearrange){
+ plans.addAll(((POLocalRearrange)op).getPlans());
+ }
+ else if(op instanceof POFRJoin) {
+ POFRJoin frj = (POFRJoin)op;
+ List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
+ if(joinPlans!=null) {
+ for (List<PhysicalPlan> list : joinPlans) {
+ plans.addAll(list);
+ }
+ }
+ }
+
+ return plans;
+ }
+}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Wed Feb 25 05:18:47 2009
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -67,9 +68,20 @@
* into the given output stream
* @param out : OutputStream to which the visual representation is written
*/
- public void explain(OutputStream out){
+ public void explain(OutputStream out) {
+ explain(out, true);
+ }
+
+ /**
+ * Write a visual representation of the Physical Plan
+ * into the given output stream
+ * @param out : OutputStream to which the visual representation is written
+ * @param verbose : Amount of information to print
+ */
+ public void explain(OutputStream out, boolean verbose){
PlanPrinter<PhysicalOperator, PhysicalPlan> mpp = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
this);
+ mpp.setVerbose(verbose);
try {
mpp.print(out);
@@ -81,6 +93,28 @@
e.printStackTrace();
}
}
+
+ /**
+ * Write a visual representation of the Physical Plan
+ * into the given printstream
+ * @param ps : PrintStream to which the visual representation is written
+ * @param format : Format to print in
+ * @param verbose : Amount of information to print
+ */
+ public void explain(PrintStream ps, String format, boolean verbose) {
+ ps.println("#-----------------------------------------------");
+ ps.println("# Physical Plan:");
+ ps.println("#-----------------------------------------------");
+
+ if (format.equals("text")) {
+ explain((OutputStream)ps, verbose);
+ } else if (format.equals("dot")) {
+ DotPOPrinter pp = new DotPOPrinter(this, ps);
+ pp.setVerbose(verbose);
+ pp.dump();
+ }
+ ps.println("");
+ }
@Override
public void connect(PhysicalOperator from, PhysicalOperator to)
@@ -162,7 +196,7 @@
return "Empty Plan!";
else{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- explain(baos);
+ explain(baos, true);
return baos.toString();
}
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Wed Feb 25 05:18:47 2009
@@ -52,6 +52,8 @@
PrintStream stream = System.out;
+ boolean isVerbose = true;
+
public PlanPrinter(P plan) {
super(plan, new DepthFirstWalker<O, P>(plan));
}
@@ -61,6 +63,10 @@
this.stream = stream;
}
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
@Override
public void visit() throws VisitorException {
try {
@@ -119,7 +125,7 @@
StringBuilder sb = new StringBuilder();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
if(pp!=null)
- pp.explain(baos);
+ pp.explain(baos, isVerbose);
else
return "";
sb.append(USep);
@@ -138,25 +144,27 @@
private String depthFirst(O node) throws VisitorException {
StringBuilder sb = new StringBuilder(node.name() + "\n");
- if(node instanceof POFilter){
+ if (isVerbose) {
+ if(node instanceof POFilter){
sb.append(planString(((POFilter)node).getPlan()));
- }
- else if(node instanceof POLocalRearrange){
+ }
+ else if(node instanceof POLocalRearrange){
sb.append(planString(((POLocalRearrange)node).getPlans()));
- }
- else if(node instanceof POSort){
+ }
+ else if(node instanceof POSort){
sb.append(planString(((POSort)node).getSortPlans()));
- }
- else if(node instanceof POForEach){
+ }
+ else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
- }
- else if(node instanceof POFRJoin){
+ }
+ else if(node instanceof POFRJoin){
POFRJoin frj = (POFRJoin)node;
List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
if(joinPlans!=null)
- for (List<PhysicalPlan> list : joinPlans) {
- sb.append(planString(list));
- }
+ for (List<PhysicalPlan> list : joinPlans) {
+ sb.append(planString(list));
+ }
+ }
}
List<O> originalPredecessors = mPlan.getPredecessors(node);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Wed Feb 25 05:18:47 2009
@@ -184,17 +184,15 @@
throw new UnsupportedOperationException();
}
- public void explain(PhysicalPlan plan, PrintStream stream) {
+ public void explain(PhysicalPlan plan, PrintStream stream,
+ String format, boolean isVerbose) {
try {
- PlanPrinter printer = new PlanPrinter(plan, stream);
- printer.visit();
- stream.println();
-
ExecTools.checkLeafIsStore(plan, pigContext);
// LocalLauncher launcher = new LocalLauncher();
LocalPigLauncher launcher = new LocalPigLauncher();
- launcher.explain(plan, pigContext, stream);
+ launcher.explain(plan, pigContext, stream,
+ format, isVerbose);
} catch (Exception ve) {
throw new RuntimeException(ve);
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Wed Feb 25 05:18:47 2009
@@ -41,10 +41,10 @@
Log log = LogFactory.getLog(getClass());
@Override
- public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps)
- throws PlanException, VisitorException, IOException {
- // TODO Auto-generated method stub
- pp.explain(ps);
+ public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+ String format, boolean isVerbose)
+ throws PlanException, VisitorException, IOException {
+ pp.explain(ps, format, isVerbose);
ps.append('\n');
}
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import org.apache.pig.impl.util.MultiMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.impl.plan.Operator;
+
+/**
+ * This class can print a logical plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting.
+ */
+public class DotLOPrinter extends DotPlanDumper<LogicalOperator, LogicalPlan,
+ LogicalOperator, LogicalPlan> {
+
+ public DotLOPrinter(LogicalPlan plan, PrintStream ps) {
+ this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+ }
+
+ private DotLOPrinter(LogicalPlan plan, PrintStream ps, boolean isSubGraph,
+ Set<Operator> subgraphs,
+ Set<Operator> multiInSubgraphs) {
+ super(plan, ps, isSubGraph, subgraphs, multiInSubgraphs);
+ }
+
+ @Override
+ protected DotPlanDumper makeDumper(LogicalPlan plan, PrintStream ps) {
+ return new DotLOPrinter(plan, ps, true, mSubgraphs,
+ mMultiInputSubgraphs);
+ }
+
+ @Override
+ protected String getName(LogicalOperator op) {
+ String info = (op.name().split("\\d+-\\d+"))[0];
+ if (op instanceof LOProject) {
+ LOProject pr = (LOProject)op;
+ info += pr.isStar()?" [*]": pr.getProjection();
+ }
+ return info;
+ }
+
+ @Override
+ protected String[] getAttributes(LogicalOperator op) {
+ if (op instanceof LOStore || op instanceof LOLoad) {
+ String[] attributes = new String[3];
+ attributes[0] = "label=\""+getName(op).replace(":",",\\n")+"\"";
+ attributes[1] = "style=\"filled\"";
+ attributes[2] = "fillcolor=\"gray\"";
+ return attributes;
+ }
+ else {
+ return super.getAttributes(op);
+ }
+ }
+
+ @Override
+ protected MultiMap<LogicalOperator, LogicalPlan>
+ getMultiInputNestedPlans(LogicalOperator op) {
+
+ if(op instanceof LOCogroup){
+ return ((LOCogroup)op).getGroupByPlans();
+ }
+ else if(op instanceof LOFRJoin){
+ return ((LOFRJoin)op).getJoinColPlans();
+ }
+ return new MultiMap<LogicalOperator, LogicalPlan>();
+ }
+
+ @Override
+ protected Collection<LogicalPlan> getNestedPlans(LogicalOperator op) {
+ Collection<LogicalPlan> plans = new LinkedList<LogicalPlan>();
+
+ if(op instanceof LOFilter){
+ plans.add(((LOFilter)op).getComparisonPlan());
+ }
+ else if(op instanceof LOForEach){
+ plans.addAll(((LOForEach)op).getForEachPlans());
+ }
+ else if(op instanceof LOGenerate){
+ plans.addAll(((LOGenerate)op).getGeneratePlans());
+ }
+ else if(op instanceof LOSort){
+ plans.addAll(((LOSort)op).getSortColPlans());
+ }
+ else if(op instanceof LOSplitOutput){
+ plans.add(((LOSplitOutput)op).getConditionPlan());
+ }
+
+ return plans;
+ }
+}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Wed Feb 25 05:18:47 2009
@@ -46,6 +46,7 @@
private String USep = "| |\n| ";
private int levelCntr = -1;
private OutputStream printer;
+ private boolean isVerbose = true;
/**
* @param ps PrintStream to output plan information to
@@ -66,6 +67,10 @@
}
}
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
public void print(OutputStream printer) throws VisitorException, IOException {
this.printer = printer;
printer.write(depthFirstLP().getBytes());
@@ -114,55 +119,58 @@
try {
sb.append(((ExpressionOperator)node).getFieldSchema());
} catch (Exception e) {
- //sb.append("Caught Exception: " + e.getMessage());
+ sb.append("Caught Exception: " + e.getMessage());
}
} else {
sb.append(" Schema: ");
try {
sb.append(node.getSchema());
} catch (Exception e) {
- //sb.append("Caught exception: " + e.getMessage());
+ sb.append("Caught exception: " + e.getMessage());
}
}
sb.append(" Type: " + DataType.findTypeName(node.getType()));
sb.append("\n");
- if(node instanceof LOFilter){
- sb.append(planString(((LOFilter)node).getComparisonPlan()));
- }
- else if(node instanceof LOForEach){
- sb.append(planString(((LOForEach)node).getForEachPlans()));
- }
- else if(node instanceof LOGenerate){
- sb.append(planString(((LOGenerate)node).getGeneratePlans()));
-
- }
- else if(node instanceof LOCogroup){
- MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
- for (LogicalOperator lo : plans.keySet()) {
- // Visit the associated plans
- for (LogicalPlan plan : plans.get(lo)) {
- sb.append(planString(plan));
+
+ if (isVerbose) {
+ if(node instanceof LOFilter){
+ sb.append(planString(((LOFilter)node).getComparisonPlan()));
+ }
+ else if(node instanceof LOForEach){
+ sb.append(planString(((LOForEach)node).getForEachPlans()));
+ }
+ else if(node instanceof LOGenerate){
+ sb.append(planString(((LOGenerate)node).getGeneratePlans()));
+
+ }
+ else if(node instanceof LOCogroup){
+ MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
+ for (LogicalOperator lo : plans.keySet()) {
+ // Visit the associated plans
+ for (LogicalPlan plan : plans.get(lo)) {
+ sb.append(planString(plan));
+ }
}
}
- }
- else if(node instanceof LOFRJoin){
- MultiMap<LogicalOperator, LogicalPlan> plans = ((LOFRJoin)node).getJoinColPlans();
- for (LogicalOperator lo : plans.keySet()) {
- // Visit the associated plans
- for (LogicalPlan plan : plans.get(lo)) {
- sb.append(planString(plan));
+ else if(node instanceof LOFRJoin){
+ MultiMap<LogicalOperator, LogicalPlan> plans = ((LOFRJoin)node).getJoinColPlans();
+ for (LogicalOperator lo : plans.keySet()) {
+ // Visit the associated plans
+ for (LogicalPlan plan : plans.get(lo)) {
+ sb.append(planString(plan));
+ }
}
}
- }
- else if(node instanceof LOSort){
- sb.append(planString(((LOSort)node).getSortColPlans()));
- }
- else if(node instanceof LOSplitOutput){
- sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
- }
- else if (node instanceof LOProject) {
- sb.append("Input: ");
- sb.append(((LOProject)node).getExpression().name());
+ else if(node instanceof LOSort){
+ sb.append(planString(((LOSort)node).getSortColPlans()));
+ }
+ else if(node instanceof LOSplitOutput){
+ sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
+ }
+ else if (node instanceof LOProject) {
+ sb.append("Input: ");
+ sb.append(((LOProject)node).getExpression().name());
+ }
}
List<LogicalOperator> originalPredecessors = mPlan.getPredecessors(node);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Wed Feb 25 05:18:47 2009
@@ -58,6 +58,24 @@
lpp.print(out);
}
+
+ public void explain(PrintStream ps, String format, boolean verbose)
+ throws VisitorException, IOException {
+ ps.println("#-----------------------------------------------");
+ ps.println("# Logical Plan:");
+ ps.println("#-----------------------------------------------");
+
+ if (format.equals("text")) {
+ LOPrinter lpp = new LOPrinter(ps, this);
+ lpp.setVerbose(verbose);
+ lpp.visit();
+ } else if (format.equals("dot")) {
+ DotLOPrinter lpp = new DotLOPrinter(this, ps);
+ lpp.setVerbose(verbose);
+ lpp.dump();
+ }
+ ps.println("");
+ }
// public String toString() {
// if(mOps.size() == 0)
Added: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.io.PrintStream;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.pig.impl.util.MultiMap;
+
+/**
+ * This class puts everything that is needed to dump a plan in a
+ * format readable by graphviz's dot algorithm. Out of the box it does
+ * not print any nested plans.
+ */
+public class DotPlanDumper<E extends Operator, P extends OperatorPlan<E>,
+ N extends Operator, S extends OperatorPlan<N>>
+ extends PlanDumper<E, P, S> {
+
+ protected Set<Operator> mSubgraphs;
+ protected Set<Operator> mMultiInputSubgraphs;
+ private boolean isSubGraph = false;
+
+ public DotPlanDumper(P plan, PrintStream ps) {
+ this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+ }
+
+ protected DotPlanDumper(P plan, PrintStream ps, boolean isSubGraph,
+ Set<Operator> mSubgraphs,
+ Set<Operator> mMultiInputSubgraphs) {
+ super(plan, ps);
+ this.isSubGraph = isSubGraph;
+ this.mSubgraphs = mSubgraphs;
+ this.mMultiInputSubgraphs = mMultiInputSubgraphs;
+ }
+
+ @Override
+ public void dump() {
+ if (!isSubGraph) {
+ ps.println("digraph plan {");
+ ps.println("compound=true;");
+ ps.println("node [shape=rect];");
+ }
+ super.dump();
+ if (!isSubGraph) {
+ ps.println("}");
+ }
+ }
+
+ @Override
+ protected void dumpMultiInputNestedOperator(E op, MultiMap<E, S> plans) {
+ dumpInvisibleOutput(op);
+
+ ps.print("subgraph ");
+ ps.print(getClusterID(op));
+ ps.println(" {");
+ join("; ", getAttributes(op));
+ ps.println("labelloc=b;");
+
+ mMultiInputSubgraphs.add(op);
+
+ for (E o: plans.keySet()) {
+ ps.print("subgraph ");
+ ps.print(getClusterID(op, o));
+ ps.println(" {");
+ ps.println("label=\"\";");
+ dumpInvisibleInput(op, o);
+ for (S plan : plans.get(o)) {
+ PlanDumper dumper = makeDumper(plan, ps);
+ dumper.dump();
+ connectInvisibleInput(op, o, plan);
+ }
+ ps.println("};");
+ }
+ ps.println("};");
+
+ for (E o: plans.keySet()) {
+ for (S plan: plans.get(o)) {
+ connectInvisibleOutput(op, plan);
+ }
+ }
+ }
+
+ @Override
+ protected void dumpNestedOperator(E op, Collection<S> plans) {
+ dumpInvisibleOperators(op);
+ ps.print("subgraph ");
+ ps.print(getClusterID(op));
+ ps.println(" {");
+ join("; ", getAttributes(op));
+ ps.println("labelloc=b;");
+
+ mSubgraphs.add(op);
+
+ for (S plan: plans) {
+ PlanDumper dumper = makeDumper(plan, ps);
+ dumper.dump();
+ connectInvisibleInput(op, plan);
+ }
+ ps.println("};");
+
+ for (S plan: plans) {
+ connectInvisibleOutput(op, plan);
+ }
+ }
+
+ @Override
+ protected void dumpOperator(E op) {
+ ps.print(getID(op));
+ ps.print(" [");
+ join(", ", getAttributes(op));
+ ps.println("];");
+ }
+
+ @Override
+ protected void dumpEdge(E op, E suc) {
+ String in = getID(op);
+ String out = getID(suc);
+ String attributes = "";
+
+ if (mMultiInputSubgraphs.contains(op) || mSubgraphs.contains(op)) {
+ in = getSubgraphID(op, false);
+ }
+
+ ps.print(in);
+
+ if (mMultiInputSubgraphs.contains(suc)) {
+ out = getSubgraphID(suc, op, true);
+ attributes = " [lhead="+getClusterID(suc,op)+"]";
+ }
+
+ if (mSubgraphs.contains(suc)) {
+ out = getSubgraphID(suc, true);
+ attributes = " [lhead="+getClusterID(suc)+"]";
+ }
+
+ ps.print(" -> ");
+ ps.print(out);
+ ps.println(attributes);
+ }
+
+ @Override
+ protected PlanDumper makeDumper(S plan, PrintStream ps) {
+ return new DotPlanDumper(plan, ps, true,
+ mSubgraphs, mMultiInputSubgraphs);
+ }
+
+ /**
+ * Used to generate the label for an operator.
+ * @param E operator to dump
+ */
+ protected String getName(E op) {
+ return op.name();
+ }
+
+ /**
+ * Used to generate the the attributes of a node
+ * @param E operator
+ */
+ protected String[] getAttributes(E op) {
+ String[] attributes = new String[1];
+ attributes[0] = "label=\""+getName(op)+"\"";
+ return attributes;
+ }
+
+
+ private void connectInvisibleInput(E op1, E op2, S plan) {
+ String in = getSubgraphID(op1, op2, true);
+
+ for (N l: plan.getRoots()) {
+ dumpInvisibleEdge(in, getID(l));
+ }
+ }
+
+ private void connectInvisibleInput(E op, S plan) {
+ String in = getSubgraphID(op, true);
+
+ for (N l: plan.getRoots()) {
+ String out;
+ if (mSubgraphs.contains(l) || mMultiInputSubgraphs.contains(l)) {
+ out = getSubgraphID(l, true);
+ } else {
+ out = getID(l);
+ }
+
+ dumpInvisibleEdge(in, out);
+ }
+ }
+
+ private void connectInvisibleOutput(E op, S plan) {
+ String out = getSubgraphID(op, false);
+
+ for (N l: plan.getLeaves()) {
+ String in;
+ if (mSubgraphs.contains(l) || mMultiInputSubgraphs.contains(l)) {
+ in = getSubgraphID(l, false);
+ } else {
+ in = getID(l);
+ }
+
+ dumpInvisibleEdge(in, out);
+ }
+ }
+
+ private void connectInvisible(E op, S plan) {
+ connectInvisibleInput(op, plan);
+ connectInvisibleOutput(op, plan);
+ }
+
+ private void dumpInvisibleInput(E op1, E op2) {
+ ps.print(getSubgraphID(op1, op2, true));
+ ps.print(" ");
+ ps.print(getInvisibleAttributes(op1));
+ ps.println(";");
+ }
+
+ private void dumpInvisibleInput(E op) {
+ ps.print(getSubgraphID(op, true));
+ ps.print(" ");
+ ps.print(getInvisibleAttributes(op));
+ ps.println(";");
+ }
+
+ private void dumpInvisibleOutput(E op) {
+ ps.print(getSubgraphID(op, false));
+ ps.print(" ");
+ ps.print(getInvisibleAttributes(op));
+ ps.println(";");
+ }
+
+ protected void dumpInvisibleOperators(E op) {
+ dumpInvisibleInput(op);
+ dumpInvisibleOutput(op);
+ }
+
+ private String getClusterID(Operator op1, Operator op2) {
+ return getClusterID(op1)+"_"+getID(op2);
+ }
+
+ private String getClusterID(Operator op) {
+ return "cluster_"+getID(op);
+ }
+
+ private String getSubgraphID(Operator op1, Operator op2, boolean in) {
+ String id = "s"+getID(op1)+"_"+getID(op2);
+ if (in) {
+ id += "_in";
+ }
+ else {
+ id += "_out";
+ }
+ return id;
+ }
+
+ private String getSubgraphID(Operator op, boolean in) {
+ String id = "s"+getID(op);
+ if (in) {
+ id += "_in";
+ }
+ else {
+ id += "_out";
+ }
+ return id;
+ }
+
+ private String getID(Operator op) {
+ return ""+Math.abs(op.hashCode());
+ }
+
+ private String getInvisibleAttributes(Operator op) {
+ return "[label=\"\", style=invis, height=0, width=0]";
+ }
+
+ private void dumpInvisibleEdge(String op, String suc) {
+ ps.print(op);
+ ps.print(" -> ");
+ ps.print(suc);
+ ps.println(" [style=invis];");
+ }
+}