You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/12 20:56:31 UTC
svn commit: r694774 - in /incubator/pig/branches/types: CHANGES.txt
src/org/apache/pig/PigServer.java
src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
test/org/apache/pig/test/TestAlgebraicEval.java
Author: olga
Date: Fri Sep 12 11:56:30 2008
New Revision: 694774
URL: http://svn.apache.org/viewvc?rev=694774&view=rev
Log:
Code to clone operators
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Sep 12 11:56:30 2008
@@ -190,3 +190,4 @@
PIG-422: cross is broken (shravanmn via olgan)
+ PIG-407: need to clone operators (pradeepk vi olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep 12 11:56:30 2008
@@ -57,7 +57,9 @@
import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.util.PropertiesUtil;
@@ -94,6 +96,7 @@
PigContext pigContext;
private String scope = constructScope();
+ private ArrayList<String> cachedScript = new ArrayList<String>();
private String constructScope() {
// scope servers for now as a session id
@@ -251,27 +254,14 @@
* @throws IOException
*/
public void registerQuery(String query, int startLine) throws IOException {
- // Bugzilla Bug 1006706 -- ignore empty queries
- //=============================================
- if(query != null) {
- query = query.trim();
- if(query.length() == 0) return;
- }else {
- return;
- }
- LogicalPlan lp = null;
- LogicalOperator op = null;
- try {
- lp = (new LogicalPlanBuilder(pigContext).parse(scope, query,
- aliases, opTable, aliasOp, startLine));
- } catch (ParseException e) {
- throw (IOException) new IOException(e.getMessage()).initCause(e);
- }
+ LogicalPlan lp = parseQuery(query, startLine, aliases, opTable, aliasOp);
+ // store away the query for use in cloning later
+ cachedScript .add(query);
if (lp.getLeaves().size() == 1)
{
- op = lp.getSingleLeafPlanOutputOp();
+ LogicalOperator op = lp.getSingleLeafPlanOutputOp();
// No need to do anything about DEFINE
if (op instanceof LODefine) {
return;
@@ -280,14 +270,76 @@
// Check if we just processed a LOStore i.e. STORE
if (op instanceof LOStore) {
try{
- execute(lp);
+ execute(null);
} catch (Exception e) {
throw WrappedIOException.wrap("Unable to store for alias: " + op.getOperatorKey().getId(), e);
}
}
}
}
+
+ private LogicalPlan parseQuery(String query, int startLine, Map<LogicalOperator, LogicalPlan> aliasesMap,
+ Map<OperatorKey, LogicalOperator> opTableMap, Map<String, LogicalOperator> aliasOpMap) throws IOException {
+ if(query != null) {
+ query = query.trim();
+ if(query.length() == 0) return null;
+ }else {
+ return null;
+ }
+ try {
+ return new LogicalPlanBuilder(pigContext).parse(scope, query,
+ aliasesMap, opTableMap, aliasOpMap, startLine);
+ } catch (ParseException e) {
+ throw (IOException) new IOException(e.getMessage()).initCause(e);
+ }
+ }
+ public LogicalPlan clonePlan(String alias) throws IOException {
+ // There are two choices on how we clone the logical plan
+ // 1 - we really clone each operator and connect up the cloned operators
+ // 2 - we cache away the script till the point we need to clone
+ // and then simply re-parse the script.
+ // The latter approach is used here
+ // FIXME: There is one open issue with this now:
+ // Consider the following script:
+ // A = load 'file:/somefile';
+ // B = filter A by $0 > 10;
+ // store B into 'bla';
+ // rm 'file:/somefile';
+ // A = load 'file:/someotherfile'
+ // when we try to clone - we try to reparse
+ // from the beginning and currently the parser
+ // checks for file existence of files in the load
+ // in the case where the file is a local one -i.e. with file: prefix
+ // This will be a known issue now and we will need to revisit later
+
+ // parse each line of the cached script and the
+ // final logical plan is the clone that we want
+ LogicalPlan lp = null;
+ int lineNumber = 1;
+ // create data structures needed for parsing
+ Map<LogicalOperator, LogicalPlan> cloneAliases = new HashMap<LogicalOperator, LogicalPlan>();
+ Map<OperatorKey, LogicalOperator> cloneOpTable = new HashMap<OperatorKey, LogicalOperator>();
+ Map<String, LogicalOperator> cloneAliasOp = new HashMap<String, LogicalOperator>();
+ for (Iterator<String> it = cachedScript.iterator(); it.hasNext(); lineNumber++) {
+ lp = parseQuery(it.next(), lineNumber, cloneAliases, cloneOpTable, cloneAliasOp);
+ }
+
+ if(alias == null) {
+ // a store prompted the execution - so return
+ // the entire logical plan
+ return lp;
+ } else {
+ // return the logical plan corresponding to the
+ // alias supplied
+ LogicalOperator op = cloneAliasOp.get(alias);
+ if(op == null) {
+ throw new IOException("Unable to find an operator for alias " + alias);
+ }
+ return cloneAliases.get(op);
+ }
+ }
+
public void registerQuery(String query) throws IOException {
registerQuery(query, 1);
}
@@ -296,7 +348,7 @@
try {
LogicalPlan lp = getPlanFromAlias(alias, "describe");
try {
- lp = compileLp(lp, "describe", false);
+ lp = compileLp(alias, false);
} catch (ExecException e) {
throw new FrontendException(e.getMessage());
}
@@ -373,9 +425,26 @@
String filename,
String func) throws IOException {
try {
- LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
- scope, readFrom, filename, func, aliasOp.get(id), aliases);
- return execute(storePlan);
+ LogicalPlan lp = compileLp(id);
+
+ // MRCompiler needs a store to be the leaf - hence
+ // add a store to the plan to explain
+
+ // figure out the leaf to which the store needs to be added
+ List<LogicalOperator> leaves = lp.getLeaves();
+ LogicalOperator leaf = null;
+ if(leaves.size() == 1) {
+ leaf = leaves.get(0);
+ } else {
+ for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
+ LogicalOperator leafOp = it.next();
+ if(leafOp.getAlias().equals(id))
+ leaf = leafOp;
+ }
+ }
+
+ LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf);
+ return executeCompiledLogicalPlan(storePlan);
} catch (Exception e) {
throw WrappedIOException.wrap("Unable to store for alias: " +
id, e);
@@ -394,20 +463,27 @@
public void explain(String alias,
PrintStream stream) throws IOException {
try {
- LogicalOperator op = aliasOp.get(alias);
- if(null == op) {
- throw new IOException("Unable to find an operator for alias " + alias);
+ LogicalPlan lp = compileLp(alias);
+
+ // MRCompiler needs a store to be the leaf - hence
+ // add a store to the plan to explain
+
+ // figure out the leaf to which the store needs to be added
+ List<LogicalOperator> leaves = lp.getLeaves();
+ LogicalOperator leaf = null;
+ for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
+ LogicalOperator leafOp = it.next();
+ if(leafOp.getAlias().equals(alias))
+ leaf = leafOp;
}
- LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
- scope, getPlanFromAlias(alias, op.getClass().getName()),
- "fakefile", PigStorage.class.getName(), aliasOp.get(alias),
- aliases);
- LogicalPlan lp = compileLp(storePlan, "explain");
+
+ LogicalPlan storePlan = QueryParser.generateStorePlan(
+ scope, lp, "fakefile", PigStorage.class.getName(), leaf);
stream.println("Logical Plan:");
- LOPrinter lv = new LOPrinter(stream, lp);
+ LOPrinter lv = new LOPrinter(stream, storePlan);
lv.visit();
- PhysicalPlan pp = compilePp(lp);
+ PhysicalPlan pp = compilePp(storePlan);
stream.println("-----------------------------------------------");
stream.println("Physical Plan:");
@@ -530,13 +606,18 @@
// pigContext.getExecutionEngine().reclaimScope(this.scope);
}
- private ExecJob execute(
- LogicalPlan lp) throws FrontendException, ExecException {
+ private ExecJob execute(String alias) throws FrontendException, ExecException {
ExecJob job = null;
// lp.explain(System.out, System.err);
- LogicalPlan typeCheckedLp = compileLp(lp, "execute");
+ LogicalPlan typeCheckedLp = compileLp(alias);
+
+ return executeCompiledLogicalPlan(typeCheckedLp);
// typeCheckedLp.explain(System.out, System.err);
- PhysicalPlan pp = compilePp(typeCheckedLp);
+
+ }
+
+ private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
+ PhysicalPlan pp = compilePp(compiledLp);
// execute using appropriate engine
FileLocalizer.clearDeleteOnFail();
ExecJob execJob = pigContext.getExecutionEngine().execute(pp, "execute");
@@ -546,24 +627,26 @@
}
private LogicalPlan compileLp(
- LogicalPlan lp,
- String operation) throws ExecException, FrontendException {
- return compileLp(lp, operation, true);
+ String alias) throws ExecException, FrontendException {
+ return compileLp(alias, true);
}
private LogicalPlan compileLp(
- LogicalPlan lp,
- String operation,
+ String alias,
boolean optimize) throws ExecException, FrontendException {
- // Look up the logical plan in the aliases map. That plan will be
- // properly connected to all the others.
-
- if(null == lp) {
- throw new FrontendException("Cannot operate on null logical plan");
+
+ // create a clone of the logical plan and give it
+ // to the operations below
+ LogicalPlan lpClone;
+ try {
+ lpClone = clonePlan(alias);
+ } catch (IOException e) {
+ throw new FrontendException("Unable to clone plan before compiling", e);
}
+
// Set the logical plan values correctly in all the operators
- PlanSetter ps = new PlanSetter(lp);
+ PlanSetter ps = new PlanSetter(lpClone);
ps.visit();
//(new SplitIntroducer(lp)).introduceImplSplits();
@@ -573,8 +656,8 @@
FrontendException caught = null;
try {
LogicalPlanValidationExecutor validator =
- new LogicalPlanValidationExecutor(lp, pigContext);
- validator.validate(lp, collector);
+ new LogicalPlanValidationExecutor(lpClone, pigContext);
+ validator.validate(lpClone, collector);
} catch (FrontendException fe) {
// Need to go through and see what the collector has in it. But
// remember what we've caught so we can wrap it into what we
@@ -612,11 +695,11 @@
// optimize
if (optimize) {
- LogicalOptimizer optimizer = new LogicalOptimizer(lp);
+ LogicalOptimizer optimizer = new LogicalOptimizer(lpClone);
optimizer.optimize();
}
- return lp;
+ return lpClone;
}
private PhysicalPlan compilePp(LogicalPlan lp) throws ExecException {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Sep 12 11:56:30 2008
@@ -110,13 +110,11 @@
return str;
}
- public static LogicalPlan generateStorePlan(Map<OperatorKey, LogicalOperator> opTable,
- String scope,
+ public static LogicalPlan generateStorePlan(String scope,
LogicalPlan readFrom,
String fileName,
String func,
- LogicalOperator input,
- Map<LogicalOperator, LogicalPlan> aliases) throws FrontendException {
+ LogicalOperator input) throws FrontendException {
if (func == null) {
func = PigStorage.class.getName();
@@ -146,8 +144,6 @@
throw new FrontendException(pe.getMessage());
}
- aliases.put(store, storePlan);
-
if (storePlan.getRoots().size() == 0) throw new RuntimeException("Store plan has no roots!");
return storePlan;
}
@@ -298,6 +294,7 @@
LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index, condPlan);
splitOp.addOutput(splitOutput);
addAlias(alias, splitOutput);
+ splitOutput.setAlias(alias);
addLogicalPlan(splitOutput, lp);
lp.add(splitOutput);
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Fri Sep 12 11:56:30 2008
@@ -53,9 +53,9 @@
MiniCluster cluster = MiniCluster.buildCluster();
@Test
public void testGroupCountWithMultipleFields() throws Throwable {
+ File tmpFile = File.createTempFile("test", "txt");
for (int k = 0; k < nullFlags.length; k++) {
System.err.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
- File tmpFile = File.createTempFile("test", "txt");
// flag to indicate if both the keys forming
// the group key are null
int groupKeyWithNulls = 0;
@@ -98,7 +98,6 @@
pig.registerQuery(" a = group (load 'file:" + tmpFile + "') by ($0,$1);");
pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);");
Iterator<Tuple> it = pig.openIterator("b");
- tmpFile.delete();
int count = 0;
System.err.println("XX Starting");
while(it.hasNext()){
@@ -123,6 +122,7 @@
assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k], LOOP_COUNT - groupKeyWithNulls + 1, count);
}
+ tmpFile.delete();
}