You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/22 03:42:41 UTC

svn commit: r639939 - in /incubator/pig/trunk: ./ lib/ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/tools/grunt/ src/org/apache/pig/tools/pigscript/parser/

Author: olga
Date: Fri Mar 21 19:42:39 2008
New Revision: 639939

URL: http://svn.apache.org/viewvc?rev=639939&view=rev
Log:
PIG-154: moved define and store into QueryParser from Grunt parser

Added:
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/lib/hadoop16.jar
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Mar 21 19:42:39 2008
@@ -177,3 +177,5 @@
 	PIG-164:  Fix memory issue in SpillableMemoryManager to partially clean the list of
 	bags each time a new bag is added rather than waiting until the garbage
 	collector tells us we are out of memory (gates).
+
+    PIG-154: moving parsing for DEFINE and STORE into QueryParser

Modified: incubator/pig/trunk/lib/hadoop16.jar
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop16.jar?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
Binary files - no diff available.

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri Mar 21 19:42:39 2008
@@ -45,6 +45,8 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LODefine;
+import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -233,8 +235,10 @@
             
         // parse the query into a logical plan
         LogicalPlan lp = null;
+        LogicalOperator op = null;
         try {
             lp = (new LogicalPlanBuilder(pigContext).parse(scope, query, aliases, opTable));
+            op = opTable.get(lp.getRoot());
         } catch (ParseException e) {
             throw (IOException) new IOException(e.getMessage()).initCause(e);
         }
@@ -242,6 +246,16 @@
         if (lp.getAlias() != null) {
             aliases.put(lp.getAlias(), lp);
         }
+        
+        // No need to do anything about DEFINE 
+        if (op instanceof LODefine) {
+            return;
+        }
+        
+        // Check if we just processed a LOStore i.e. STORE
+        if (op instanceof LOStore) {
+            runQuery(lp);
+        }
     }
       
     public void dumpSchema(String alias) throws IOException{
@@ -330,6 +344,10 @@
                                                               func,
                                                               pigContext);
 
+        runQuery(storePlan);
+    }
+
+    private void runQuery(LogicalPlan storePlan) throws IOException {
         try {
             ExecPhysicalPlan pp = 
                 pigContext.getExecutionEngine().compile(storePlan, null);
@@ -337,10 +355,10 @@
             pigContext.getExecutionEngine().execute(pp);
         }
         catch (ExecException e) {
-            throw WrappedIOException.wrap("Unable to store alias " + readFrom.getAlias(), e);
+            throw WrappedIOException.wrap("Unable to store alias " + 
+                                          storePlan.getAlias(), e);
         }
     }
-
     /**
      * Provide information on how a pig query will be executed.  For now
      * this information is very developer focussed, and probably not very

Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java?rev=639939&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java Fri Mar 21 19:42:39 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.Map;
+
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+public class LODefine extends LogicalOperator {
+    private static final long serialVersionUID = 1L;
+
+    public LODefine(Map<OperatorKey, LogicalOperator> opTable,
+                    String scope, 
+                    long id) {
+        super(opTable, scope, id);
+    }
+    
+    public int getOutputType() {
+        return 0;
+    }
+
+    public TupleSchema outputSchema() {
+        return null;
+    }
+
+    public void visit(LOVisitor v) {}
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Mar 21 19:42:39 2008
@@ -122,7 +122,8 @@
 	
 	
 	
-	String massageFilename(String filename, PigContext pigContext) throws IOException, ParseException {
+	String massageFilename(String filename, PigContext pigContext, boolean checkExists) 
+	throws IOException, ParseException {
 		if (pigContext.getExecType() != ExecType.LOCAL) {
 			if (filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
 					filename = FileLocalizer.hadoopify(filename, pigContext);
@@ -130,7 +131,7 @@
 			else
 			{
 				// make sure that dfs file exists
-				if (!FileLocalizer.fileExists(filename, pigContext))
+				if (checkExists && !FileLocalizer.fileExists(filename, pigContext))
 				{
 					throw new ParseException(FileLocalizer.fullPath(filename, pigContext) + " does not exist");
 				}
@@ -347,6 +348,7 @@
 TOKEN : { <FILTEROP : <STRFILTEROP> | <NUMFILTEROP>  > }
 
 // List all the keywords in the language
+TOKEN : { <DEFINE : "define"> }
 TOKEN : { <LOAD : "load"> }
 TOKEN : { <FILTER : "filter"> }
 TOKEN : { <FOREACH : "foreach"> }
@@ -386,6 +388,7 @@
 TOKEN : { <STREAM : "stream"> }
 TOKEN : { <THROUGH : "through"> }
 TOKEN : { <BACKTICK : "`"> }
+TOKEN : { <STORE : "store"> }
 
 TOKEN:
 {
@@ -504,7 +507,8 @@
 {
 	(
 	(
-	(<LOAD> op = LoadClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
+	(<DEFINE> op = DefineClause()) 
+|	(<LOAD> op = LoadClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
 |	((<GROUP> | <COGROUP>) op = CogroupClause())
 |	(<FILTER> op = FilterClause())
 |   (<ORDER> op = OrderClause())
@@ -514,6 +518,7 @@
 |	(<UNION> op = UnionClause())
 |	(<FOREACH> op = ForEachClause())
 |   (<STREAM> op = StreamClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
+|   (<STORE> op = StoreClause())
 	)
     [<PARALLEL> t2=<NUMBER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
 	)	
@@ -538,7 +543,7 @@
 			funcSpec += continuous ? "('\t','\n','0')" : "()";
 		}
 		 
-		lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext), funcSpec));	
+		lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec));	
 		if (continuous)
 			lo.setOutputType(LogicalOperator.MONOTONE);
 		return lo;
@@ -1248,3 +1253,45 @@
 	}
 }
 
+LogicalOperator DefineClause() : {Token t; Token t1; String functionName, functionArgs;}
+{
+	t = <IDENTIFIER>
+	(
+        functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
+        {
+        	pigContext.registerFunction(t.image, 
+        	                            (functionName + "(" + functionArgs + ")"));
+        }
+    )
+    {
+    	// Return the dummy LODefine
+    	return new LODefine(opTable, scope, getNextId());
+    }
+}
+
+LogicalOperator StoreClause() : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; String functionName, functionArgs;}
+{   
+	t = <IDENTIFIER> <INTO> fileName = FileName()
+    (
+        <USING> functionName = QualifiedFunction()
+        {functionSpec = functionName;} 
+        (
+            "(" functionArgs = StringList() ")"
+            {functionSpec = functionSpec + "(" + functionArgs + ")";}
+        )?
+    )?
+    {
+        if (functionSpec == null){
+            functionSpec = PigStorage.class.getName();
+        }
+         
+        LogicalPlan readFrom = aliases.get(t.image);
+       
+        lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
+                         new FileSpec(massageFilename(fileName, pigContext, false), 
+                                      functionSpec),
+                         false);
+            
+        return lo;
+    } 
+}
\ No newline at end of file

Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Mar 21 19:42:39 2008
@@ -112,10 +112,6 @@
         mDone = true;
     }
     
-    protected void processRegisterFunc(String name, String expr) {
-        mPigServer.registerFunction(name, expr);
-    }
-    
     protected void processDescribe(String alias) throws IOException {
         mPigServer.dumpSchema(alias);
     }
@@ -158,10 +154,6 @@
         }
     }
     
-    protected void processStore(String alias, String file, String func) throws IOException {
-        mPigServer.store(alias, file, func);
-    }
-        
     protected void processCat(String path) throws IOException
     {
         try {

Modified: incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Mar 21 19:42:39 2008
@@ -44,8 +44,6 @@
 	
 	abstract protected void quit();
 	
-	abstract protected void processRegisterFunc(String name, String expr);
-	
 	abstract protected void processDescribe(String alias) throws IOException;
 
     abstract protected void processExplain(String alias) throws IOException;
@@ -53,8 +51,6 @@
 	abstract protected void processRegister(String jar) throws IOException;
 
 	abstract protected void processSet(String key, String value) throws IOException, ParseException;
-	
-	abstract protected void processStore(String alias, String file, String func) throws IOException;
 		
 	abstract protected void processCat(String path) throws IOException;
 
@@ -112,7 +108,6 @@
 TOKEN: {<COPY: "cp">}
 TOKEN: {<COPYFROMLOCAL: "copyFromLocal">}
 TOKEN: {<COPYTOLOCAL: "copyToLocal">}
-TOKEN: {<DEFINE: "define">}
 TOKEN: {<DUMP: "dump">}
 TOKEN: {<DESCRIBE: "describe">}
 TOKEN: {<EXPLAIN: "explain">}
@@ -126,15 +121,12 @@
 TOKEN: {<REGISTER: "register">}
 TOKEN: {<REMOVE: "rm">}
 TOKEN: {<SET: "set">}
-TOKEN: {<STORE: "store">}
-TOKEN: {<INTO: "into">}
-TOKEN: {<USING: "using">}
 
 // internal use commands
 TOKEN: {<SCRIPT_DONE: "scriptDone">}
 
 // Define pig command as 
-// (1) Starting with "split" or assignment (A=) followed by
+// (1) Starting with "split"/"define"/"store" or assignment (A=) followed by
 // (2) Single statement followed by ; and newline or
 // (3) Block of statements enclosed in
 
@@ -156,8 +148,9 @@
 
 <DEFAULT> MORE :
 {
-	<"split"> : PIG_START
-| 	<"("> : FUNC_ARGS_START
+    <"split"> : PIG_START
+|   <"define"> : PIG_START
+|	<"store"> : PIG_START
 | 	<(["a"-"z", "A"-"Z"])+(["a"-"z", "A"-"Z"] | ["0"-"9"] | "_")*(" " | "\t")*"="> : PIG_START
 }
 
@@ -213,25 +206,6 @@
 		}: DEFAULT
 }
 
-<FUNC_ARGS_START> MORE :
-{
-	<"("> {funcBlockLevel = 1;} : IN_ARG_BLOCK
-|	<")"> : FUNC_ARGS_END
-|	<(~[])>
-}
-
-<IN_ARG_BLOCK> MORE:
-{
-        <"("> {funcBlockLevel++;}
-|       <")"> {funcBlockLevel--; if (funcBlockLevel == 0) SwitchTo(FUNC_ARGS_END);}
-|       <(~[])>
-}
-
-<FUNC_ARGS_END> TOKEN :
-{
-        <FUNC_ARGS: ""> {matchedToken.image = image.toString();}: DEFAULT
-}
-
 // other
 TOKEN: {<EOL:  "\r" | "\n" | "\r\n">}
 TOKEN: {<QUOTE: "'">}
@@ -302,13 +276,6 @@
 	t2 = GetPath()
 	{processCopyToLocal(t1.image, t2.image);}	
 	|
-	<DEFINE>
-	t1 = <IDENTIFIER>
-	(
-	val = QualifiedFunction()
-	)
-	{processRegisterFunc(t1.image, val);}
-	|
 	<DUMP>
 	t1 = <IDENTIFIER>
 	{processDump(t1.image);}
@@ -374,16 +341,6 @@
 		{processSet(t1.image, t2.image);}
 	)
 	|
-	<STORE>
-	t1 = <IDENTIFIER>
-	<INTO>
-	t2 = GetPath()
-	(
-		<USING>
-		val = QualifiedFunction()
-	)?
-	{processStore(t1.image, unquote(t2.image), val);}
-	|
 	<EOF>
 	{quit();}
 	|
@@ -396,17 +353,6 @@
 	)
 }
 
-String QualifiedFunction()       : {Token t1;StringBuffer s=new StringBuffer();}
-{
-	t1 = GetPath()
-	{s.append(t1.image);}
-	(
-		t1 = <FUNC_ARGS>
-		{s.append(t1.image);}
-	)*
-        {return s.toString();}
-}
-
 Token GetPath() :
 {
 	Token t;
@@ -461,8 +407,6 @@
 	|
 	t = <COPYTOLOCAL>
 	|
-	t = <DEFINE>
-	|
 	t = <DUMP>
 	|
 	t = <DESCRIBE>
@@ -488,12 +432,6 @@
 	t = <REMOVE>
 	|
 	t = <SET>
-	|
-	t = <STORE>
-	|
-	t = <INTO>
-	|
-	t = <USING>
 	|
 	t = <SCRIPT_DONE>
 	)