You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/22 03:42:41 UTC
svn commit: r639939 - in /incubator/pig/trunk: ./ lib/ src/org/apache/pig/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/tools/grunt/ src/org/apache/pig/tools/pigscript/parser/
Author: olga
Date: Fri Mar 21 19:42:39 2008
New Revision: 639939
URL: http://svn.apache.org/viewvc?rev=639939&view=rev
Log:
PIG-154: moved define and store into QueryParser from Grunt parser
Added:
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/lib/hadoop16.jar
incubator/pig/trunk/src/org/apache/pig/PigServer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Mar 21 19:42:39 2008
@@ -177,3 +177,5 @@
PIG-164: Fix memory issue in SpillableMemoryManager to partially clean the list of
bags each time a new bag is added rather than waiting until the garbage
collector tells us we are out of memory (gates).
+
+ PIG-154: moving parsing for DEFINE and STORE into QueryParser
Modified: incubator/pig/trunk/lib/hadoop16.jar
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop16.jar?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
Binary files - no diff available.
Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri Mar 21 19:42:39 2008
@@ -45,6 +45,8 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LODefine;
+import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -233,8 +235,10 @@
// parse the query into a logical plan
LogicalPlan lp = null;
+ LogicalOperator op = null;
try {
lp = (new LogicalPlanBuilder(pigContext).parse(scope, query, aliases, opTable));
+ op = opTable.get(lp.getRoot());
} catch (ParseException e) {
throw (IOException) new IOException(e.getMessage()).initCause(e);
}
@@ -242,6 +246,16 @@
if (lp.getAlias() != null) {
aliases.put(lp.getAlias(), lp);
}
+
+ // No need to do anything about DEFINE
+ if (op instanceof LODefine) {
+ return;
+ }
+
+ // Check if we just processed a LOStore i.e. STORE
+ if (op instanceof LOStore) {
+ runQuery(lp);
+ }
}
public void dumpSchema(String alias) throws IOException{
@@ -330,6 +344,10 @@
func,
pigContext);
+ runQuery(storePlan);
+ }
+
+ private void runQuery(LogicalPlan storePlan) throws IOException {
try {
ExecPhysicalPlan pp =
pigContext.getExecutionEngine().compile(storePlan, null);
@@ -337,10 +355,10 @@
pigContext.getExecutionEngine().execute(pp);
}
catch (ExecException e) {
- throw WrappedIOException.wrap("Unable to store alias " + readFrom.getAlias(), e);
+ throw WrappedIOException.wrap("Unable to store alias " +
+ storePlan.getAlias(), e);
}
}
-
/**
* Provide information on how a pig query will be executed. For now
* this information is very developer focussed, and probably not very
Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java?rev=639939&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODefine.java Fri Mar 21 19:42:39 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.Map;
+
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+public class LODefine extends LogicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ public LODefine(Map<OperatorKey, LogicalOperator> opTable,
+ String scope,
+ long id) {
+ super(opTable, scope, id);
+ }
+
+ public int getOutputType() {
+ return 0;
+ }
+
+ public TupleSchema outputSchema() {
+ return null;
+ }
+
+ public void visit(LOVisitor v) {}
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Mar 21 19:42:39 2008
@@ -122,7 +122,8 @@
- String massageFilename(String filename, PigContext pigContext) throws IOException, ParseException {
+ String massageFilename(String filename, PigContext pigContext, boolean checkExists)
+ throws IOException, ParseException {
if (pigContext.getExecType() != ExecType.LOCAL) {
if (filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
filename = FileLocalizer.hadoopify(filename, pigContext);
@@ -130,7 +131,7 @@
else
{
// make sure that dfs file exists
- if (!FileLocalizer.fileExists(filename, pigContext))
+ if (checkExists && !FileLocalizer.fileExists(filename, pigContext))
{
throw new ParseException(FileLocalizer.fullPath(filename, pigContext) + " does not exist");
}
@@ -347,6 +348,7 @@
TOKEN : { <FILTEROP : <STRFILTEROP> | <NUMFILTEROP> > }
// List all the keywords in the language
+TOKEN : { <DEFINE : "define"> }
TOKEN : { <LOAD : "load"> }
TOKEN : { <FILTER : "filter"> }
TOKEN : { <FOREACH : "foreach"> }
@@ -386,6 +388,7 @@
TOKEN : { <STREAM : "stream"> }
TOKEN : { <THROUGH : "through"> }
TOKEN : { <BACKTICK : "`"> }
+TOKEN : { <STORE : "store"> }
TOKEN:
{
@@ -504,7 +507,8 @@
{
(
(
- (<LOAD> op = LoadClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
+ (<DEFINE> op = DefineClause())
+| (<LOAD> op = LoadClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
| ((<GROUP> | <COGROUP>) op = CogroupClause())
| (<FILTER> op = FilterClause())
| (<ORDER> op = OrderClause())
@@ -514,6 +518,7 @@
| (<UNION> op = UnionClause())
| (<FOREACH> op = ForEachClause())
| (<STREAM> op = StreamClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
+| (<STORE> op = StoreClause())
)
[<PARALLEL> t2=<NUMBER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
)
@@ -538,7 +543,7 @@
funcSpec += continuous ? "('\t','\n','0')" : "()";
}
- lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext), funcSpec));
+ lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec));
if (continuous)
lo.setOutputType(LogicalOperator.MONOTONE);
return lo;
@@ -1248,3 +1253,45 @@
}
}
+LogicalOperator DefineClause() : {Token t; Token t1; String functionName, functionArgs;}
+{
+ t = <IDENTIFIER>
+ (
+ functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
+ {
+ pigContext.registerFunction(t.image,
+ (functionName + "(" + functionArgs + ")"));
+ }
+ )
+ {
+ // Return the dummy LODefine
+ return new LODefine(opTable, scope, getNextId());
+ }
+}
+
+LogicalOperator StoreClause() : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; String functionName, functionArgs;}
+{
+ t = <IDENTIFIER> <INTO> fileName = FileName()
+ (
+ <USING> functionName = QualifiedFunction()
+ {functionSpec = functionName;}
+ (
+ "(" functionArgs = StringList() ")"
+ {functionSpec = functionSpec + "(" + functionArgs + ")";}
+ )?
+ )?
+ {
+ if (functionSpec == null){
+ functionSpec = PigStorage.class.getName();
+ }
+
+ LogicalPlan readFrom = aliases.get(t.image);
+
+ lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
+ new FileSpec(massageFilename(fileName, pigContext, false),
+ functionSpec),
+ false);
+
+ return lo;
+ }
+}
\ No newline at end of file
Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Mar 21 19:42:39 2008
@@ -112,10 +112,6 @@
mDone = true;
}
- protected void processRegisterFunc(String name, String expr) {
- mPigServer.registerFunction(name, expr);
- }
-
protected void processDescribe(String alias) throws IOException {
mPigServer.dumpSchema(alias);
}
@@ -158,10 +154,6 @@
}
}
- protected void processStore(String alias, String file, String func) throws IOException {
- mPigServer.store(alias, file, func);
- }
-
protected void processCat(String path) throws IOException
{
try {
Modified: incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=639939&r1=639938&r2=639939&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Mar 21 19:42:39 2008
@@ -44,8 +44,6 @@
abstract protected void quit();
- abstract protected void processRegisterFunc(String name, String expr);
-
abstract protected void processDescribe(String alias) throws IOException;
abstract protected void processExplain(String alias) throws IOException;
@@ -53,8 +51,6 @@
abstract protected void processRegister(String jar) throws IOException;
abstract protected void processSet(String key, String value) throws IOException, ParseException;
-
- abstract protected void processStore(String alias, String file, String func) throws IOException;
abstract protected void processCat(String path) throws IOException;
@@ -112,7 +108,6 @@
TOKEN: {<COPY: "cp">}
TOKEN: {<COPYFROMLOCAL: "copyFromLocal">}
TOKEN: {<COPYTOLOCAL: "copyToLocal">}
-TOKEN: {<DEFINE: "define">}
TOKEN: {<DUMP: "dump">}
TOKEN: {<DESCRIBE: "describe">}
TOKEN: {<EXPLAIN: "explain">}
@@ -126,15 +121,12 @@
TOKEN: {<REGISTER: "register">}
TOKEN: {<REMOVE: "rm">}
TOKEN: {<SET: "set">}
-TOKEN: {<STORE: "store">}
-TOKEN: {<INTO: "into">}
-TOKEN: {<USING: "using">}
// internal use commands
TOKEN: {<SCRIPT_DONE: "scriptDone">}
// Define pig command as
-// (1) Starting with "split" or assignment (A=) followed by
+// (1) Starting with "split"/"define"/"store" or assignment (A=) followed by
// (2) Single statement followed by ; and newline or
// (3) Block of statements enclosed in
@@ -156,8 +148,9 @@
<DEFAULT> MORE :
{
- <"split"> : PIG_START
-| <"("> : FUNC_ARGS_START
+ <"split"> : PIG_START
+| <"define"> : PIG_START
+| <"store"> : PIG_START
| <(["a"-"z", "A"-"Z"])+(["a"-"z", "A"-"Z"] | ["0"-"9"] | "_")*(" " | "\t")*"="> : PIG_START
}
@@ -213,25 +206,6 @@
}: DEFAULT
}
-<FUNC_ARGS_START> MORE :
-{
- <"("> {funcBlockLevel = 1;} : IN_ARG_BLOCK
-| <")"> : FUNC_ARGS_END
-| <(~[])>
-}
-
-<IN_ARG_BLOCK> MORE:
-{
- <"("> {funcBlockLevel++;}
-| <")"> {funcBlockLevel--; if (funcBlockLevel == 0) SwitchTo(FUNC_ARGS_END);}
-| <(~[])>
-}
-
-<FUNC_ARGS_END> TOKEN :
-{
- <FUNC_ARGS: ""> {matchedToken.image = image.toString();}: DEFAULT
-}
-
// other
TOKEN: {<EOL: "\r" | "\n" | "\r\n">}
TOKEN: {<QUOTE: "'">}
@@ -302,13 +276,6 @@
t2 = GetPath()
{processCopyToLocal(t1.image, t2.image);}
|
- <DEFINE>
- t1 = <IDENTIFIER>
- (
- val = QualifiedFunction()
- )
- {processRegisterFunc(t1.image, val);}
- |
<DUMP>
t1 = <IDENTIFIER>
{processDump(t1.image);}
@@ -374,16 +341,6 @@
{processSet(t1.image, t2.image);}
)
|
- <STORE>
- t1 = <IDENTIFIER>
- <INTO>
- t2 = GetPath()
- (
- <USING>
- val = QualifiedFunction()
- )?
- {processStore(t1.image, unquote(t2.image), val);}
- |
<EOF>
{quit();}
|
@@ -396,17 +353,6 @@
)
}
-String QualifiedFunction() : {Token t1;StringBuffer s=new StringBuffer();}
-{
- t1 = GetPath()
- {s.append(t1.image);}
- (
- t1 = <FUNC_ARGS>
- {s.append(t1.image);}
- )*
- {return s.toString();}
-}
-
Token GetPath() :
{
Token t;
@@ -461,8 +407,6 @@
|
t = <COPYTOLOCAL>
|
- t = <DEFINE>
- |
t = <DUMP>
|
t = <DESCRIBE>
@@ -488,12 +432,6 @@
t = <REMOVE>
|
t = <SET>
- |
- t = <STORE>
- |
- t = <INTO>
- |
- t = <USING>
|
t = <SCRIPT_DONE>
)