You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/02/15 02:09:01 UTC

svn commit: r1070714 - in /pig/trunk: build.xml src/org/apache/pig/parser/AliasMasker.g src/org/apache/pig/parser/MacroExpansion.g src/org/apache/pig/parser/PigMacro.java test/org/apache/pig/test/TestMacroExpansion.java

Author: rding
Date: Tue Feb 15 01:09:01 2011
New Revision: 1070714

URL: http://svn.apache.org/viewvc?rev=1070714&view=rev
Log:
PIG-1793: Add macro expansion to Pig Latin

Modified:
    pig/trunk/build.xml
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/MacroExpansion.g
    pig/trunk/src/org/apache/pig/parser/PigMacro.java
    pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Tue Feb 15 01:09:01 2011
@@ -320,7 +320,7 @@
 	    description="generates token parser class from an ANTLR grammar">
 	    <java classname="org.antlr.Tool"
 	      classpathref="classpath" fork="true">
-		      <arg line="-o ${src.gen.dir}/${grammar.package.dir} ${src.dir}/${grammar.package.dir}/MacroExpansion.g ${src.dir}/${grammar.package.dir}/MacroImport.g ${src.dir}/${grammar.package.dir}/${grammar.name}Parser.g"/>
+		      <arg line="-o ${src.gen.dir}/${grammar.package.dir} ${src.dir}/${grammar.package.dir}/MacroExpansion.g ${src.dir}/${grammar.package.dir}/MacroRecursion.g ${src.dir}/${grammar.package.dir}/MacroImport.g ${src.dir}/${grammar.package.dir}/${grammar.name}Parser.g"/>
 	    </java>
 	  </target>
 	

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Tue Feb 15 01:09:01 2011
@@ -75,6 +75,8 @@ private String getMask(String alias) {
 
 private Set<String> params = new HashSet<String>();
 
+private Set<String> aliasSeen = new HashSet<String>();
+
 private String macroName = "";
 
 private long index = 0;
@@ -109,7 +111,8 @@ foreach_statement 
     : ^( STATEMENT ( alias { sb.append(" = "); } )? foreach_clause { sb.append("\n"); } )
 ;
 
-alias : IDENTIFIER { sb.append(getMask($IDENTIFIER.text)); }
+alias 
+    : IDENTIFIER { sb.append(getMask($IDENTIFIER.text)); aliasSeen.add($IDENTIFIER.text); }
 ;
 
 op_clause : define_clause 
@@ -530,7 +533,13 @@ col_ref : alias_col_ref | dollar_col_ref
 
 alias_col_ref 
     : GROUP { sb.append($GROUP.text); } 
-    | IDENTIFIER { sb.append($IDENTIFIER.text); } 
+    | name = IDENTIFIER {  
+        if (aliasSeen.contains($name.text)) {
+            sb.append(getMask($name.text));
+        } else {
+            sb.append($name.text);
+        } 
+    } 
 ;
 
 dollar_col_ref 
@@ -689,3 +698,4 @@ rel_str_op 
     | STR_OP_LTE { sb.append($STR_OP_LTE.text); }
     | STR_OP_MATCHES { sb.append($STR_OP_MATCHES.text); }
 ;
+

Modified: pig/trunk/src/org/apache/pig/parser/MacroExpansion.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/MacroExpansion.g?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/MacroExpansion.g (original)
+++ pig/trunk/src/org/apache/pig/parser/MacroExpansion.g Tue Feb 15 01:09:01 2011
@@ -27,12 +27,11 @@ package org.apache.pig.parser;
 
 import java.util.HashMap;
 import java.util.List;
-import java.io.IOException;
 import org.apache.pig.parser.PigMacro;
 }
 
 @members {
-    private HashMap<String, PigMacro> memory = new HashMap<String, PigMacro>();
+    private Map<String, PigMacro> memory = new HashMap<String, PigMacro>();
     private StringBuilder sb = new StringBuilder();
     
     public String getResultString() { return sb.toString(); } 
@@ -42,7 +41,7 @@ MACRO 
     : 'define' WS name=ALIAS WS? '(' ( params+=ALIAS (',' WS* params+=ALIAS)* )? ')' WS 'returns' WS rets+=ALIAS (',' WS* rets+=ALIAS)* WS  '{' content=BLOCK '};' 
         {
             PigMacro macro = new PigMacro($name.text);
-            macro.setBody($content.text);
+            macro.setBody($content.text, memory);
             if ($params != null) {
                 for (Object param : $params) {
                     macro.addParam(((Token)param).getText());

Modified: pig/trunk/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/PigMacro.java?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/trunk/src/org/apache/pig/parser/PigMacro.java Tue Feb 15 01:09:01 2011
@@ -18,22 +18,24 @@
 package org.apache.pig.parser;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.antlr.runtime.ANTLRReaderStream;
 import org.antlr.runtime.CharStream;
 import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.Token;
 import org.antlr.runtime.tree.CommonTreeNodeStream;
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
 
@@ -45,18 +47,22 @@ public class PigMacro {
     private String body;
     private List<String> params;
     private List<String> rets;
+    private Map<String, PigMacro> seen;
     private long idx = 0;
 
     public PigMacro(String name) {
         this.name = name;
         this.params = new ArrayList<String>();
         this.rets = new ArrayList<String>();
+        LOG.info("Macro '" + name + "' is defined");
     }
 
-    public void setBody(String body) {
+    public void setBody(String body, Map<String, PigMacro> seen) {
         this.body = body;
+        this.seen = new HashMap<String, PigMacro>(seen);
+        expandBody();
     }
-
+    
     public void addParam(String param) {
         params.add(param);
     }
@@ -74,8 +80,20 @@ public class PigMacro {
     public List<String> getReturns() { return rets; }
 
     public String inline(String[] inputs, String[] outputs) {
-        ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(
-                50);
+        String in = substituteParams(inputs, outputs);
+        Set<String> masks = new HashSet<String>();
+        if (inputs != null) {
+            for (String s : inputs) {
+                masks.add(s);
+            }
+        }
+        for (String s : outputs) {
+            masks.add(s);
+        }
+        return maskAlias(in, masks);
+    }
+    
+    public String substituteParams(String[] inputs, String[] outputs) {
         if ((inputs == null && !params.isEmpty())
                 || (inputs != null && inputs.length != params.size())) {
             throw new RuntimeException("Failed to expand macro '" + name
@@ -83,26 +101,29 @@ public class PigMacro {
                     + " actual number of inputs: "
                     + ((inputs == null) ? 0 : inputs.length));
         }
-        if ((outputs == null && !rets.isEmpty())
-                || (outputs != null && outputs.length != rets.size())) {
+        if (outputs == null || outputs.length != rets.size()) {
             throw new RuntimeException("Failed to expand macro '" + name
                     + "': expected number of return aliases: " + rets.size()
                     + " actual number of return values: "
                     + ((outputs == null) ? 0 : outputs.length));
         }
-        Set<String> masks = new HashSet<String>();
+        
         String[] args = new String[params.size() + rets.size()];
         for (int i=0; i<params.size(); i++) {
-            args[i] = params.get(i) + "=" + inputs[i];
-            masks.add(inputs[i]);
+            String p = inputs[i];
+            p = p.startsWith("$") ? ("\\\\" + p) : p;
+            args[i] = params.get(i) + "=" + p;
         }
         for (int i=0; i<rets.size(); i++) {
-            args[params.size() + i] = rets.get(i) + "=" + outputs[i];
-            masks.add(outputs[i]);
+            String p = outputs[i];
+            p = p.startsWith("$") ? ("\\\\" + p) : p;
+            args[params.size() + i] = rets.get(i) + "=" + p;
         }
         StringWriter writer = new StringWriter();
         BufferedReader in = new BufferedReader(new StringReader(body));
         try {
+            ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(
+                    50);
             psp.genSubstitutedFile(in, writer, args, null);
         } catch (ParseException e) {
             throw new RuntimeException(
@@ -110,10 +131,14 @@ public class PigMacro {
         }
         
         LOG.debug("--- after substition:\n" + writer.toString());
-
+        
+        return writer.toString();
+    }
+    
+    public String maskAlias(String in, Set<String> masks) {
         String resultString = "";
         try {
-            CharStream input = new QueryParserStringStream(writer.toString());
+            CharStream input = new QueryParserStringStream(in);
             QueryLexer lex = new QueryLexer(input);
             CommonTokenStream tokens = new  CommonTokenStream(lex);
 
@@ -141,4 +166,25 @@ public class PigMacro {
         return resultString;
     }
     
+    private void expandBody() {
+        // expand macros
+        boolean done = false;
+        
+        while (!done) {
+            StringReader srd = new StringReader(body);
+            ANTLRReaderStream input;
+            try {
+                input = new ANTLRReaderStream(srd);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to read ", e);
+            }
+            MacroRecursion expander = new MacroRecursion(input);
+            expander.setMacros(seen);
+            Token token = Token.EOF_TOKEN;
+            while ((token = expander.nextToken()) != Token.EOF_TOKEN);
+        
+            body = expander.getResultString();
+            done = !expander.isExpanded();
+        }
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java Tue Feb 15 01:09:01 2011
@@ -1058,6 +1058,144 @@ public class TestMacroExpansion {
         
         validate(s);
     }
+    
+    @Test
+    public void recursiveMacrosTest() throws Throwable {
+        String macro1 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+            "    C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+            "    $B = foreach_count(C, $A);" +
+            "};\n";
+        
+        String macro2 = "define foreach_count(A, C) returns B {\n" +
+            "    $B = foreach $A generate group, COUNT($C);\n" +
+            "};\n";
+        
+        
+        String script = macro2 + macro1 +
+            "alpha = load 'users' as (user, age, zip);\n" +
+            "gamma = group_and_partition (alpha, user, 23);\n" +
+            "store gamma into 'byuser';\n";
+
+        StringReader rd = new StringReader(script);
+        String s = ParserUtil.expandMacros(rd);
+
+        validate(s);
+
+        String expected =
+            "\n\nalpha = load 'users' as (user, age, zip);\n" +
+            "macro_group_and_partition_C_0 = group alpha by (user) partition BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 23;\n" +
+            "gamma = foreach macro_group_and_partition_C_0 generate group, COUNT(alpha);\n\n" +
+            "store gamma into 'byuser';\n";
+
+        Assert.assertEquals(expected, s);
+    }
+    
+    @Test
+    public void recursiveMacrosTest2() throws Throwable {
+        String macro1 = "define foreach_count(A, C) returns B {\n" +
+        "    $B = foreach $A generate group, COUNT($C);\n" +
+        "};\n";
+        
+        String macro2 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+            "    C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+            "    $B = foreach_count(C, $A);\n" +
+            "};\n";
+        
+        String macro3 = "define load_and_group() returns B {\n" +
+            "   alpha = load 'users' as (user, age, zip);\n" +
+            "   $B = group_and_partition(alpha, user, 30);\n" +
+            "};\n";
+        
+        String script = macro1 + macro2 + macro3 +
+            "gamma = load_and_group ();\n" +
+            "store gamma into 'byuser';\n";
+
+        StringReader rd = new StringReader(script);
+        String s = ParserUtil.expandMacros(rd);
+
+        validate(s);
+
+        String expected =
+            "\n\n\nmacro_load_and_group_alpha_0 = load 'users' as (user, age, zip);\n" +
+            "macro_load_and_group_C_0 = group macro_load_and_group_alpha_0 by (user) partition BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 30;\n" +
+            "gamma = foreach macro_load_and_group_C_0 generate group, COUNT(macro_load_and_group_alpha_0);\n\n" +
+            "store gamma into 'byuser';\n";
+
+        Assert.assertEquals(expected, s);
+    }
+    
+    @Test
+    public void sequenceMacrosTest() throws Throwable {
+        String macro1 = "define foreach_count(A, C) returns B {\n" +
+        "    $B = foreach $A generate group, COUNT($C);\n" +
+        "};\n";
+        
+        String macro2 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+            "    $B = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+            "};\n";
+        
+        String script = macro1 + macro2 +
+            "alpha = load 'users' as (user, age, zip);\n" +
+            "beta = group_and_partition (alpha, user, 20);\n" +
+            "gamma = foreach_count(beta, alpha);\n" +
+            "store gamma into 'byuser';\n";
+
+        StringReader rd = new StringReader(script);
+        String s = ParserUtil.expandMacros(rd);
+
+        validate(s);
+
+        String expected =
+            "\n\nalpha = load 'users' as (user, age, zip);\n" +
+            "beta = group alpha by (user) partition BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 20;\n\n" +
+            "gamma = foreach beta generate group, COUNT(alpha);\n\n" +
+            "store gamma into 'byuser';\n";
+
+        Assert.assertEquals(expected, s);
+    }
+    
+    @Test(expected = RuntimeException.class)
+    public void selfRecursiveTest() throws Throwable {
+        String macro1 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+            "    C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+            "    $B = group_and_partition(C, age, 34);" +
+            "};\n";
+        
+        String script = macro1 +
+            "alpha = load 'users' as (user, age, zip);\n" +
+            "gamma = group_and_partition (alpha, user, 23);\n" +
+            "store gamma into 'byuser';\n";
+
+        StringReader rd = new StringReader(script);
+        String s = ParserUtil.expandMacros(rd);
+
+        validate(s);
+    }
+    
+    @Test(expected = RuntimeException.class)
+    public void cyclicRecursiveTest() throws Throwable {
+        String macro1 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+            "    C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+            "    $B = foreach_count(C, $A);" +
+            "};\n";
+        
+        String macro2 = "define foreach_count(A, C) returns B {\n" +
+            "    $B = foreach $A generate group, COUNT($C);\n" +
+            "    D = group_and_partition($C, age, 23);" +
+            "};\n";
+        
+        
+        String script = macro2 + macro1 +
+            "alpha = load 'users' as (user, age, zip);\n" +
+            "gamma = group_and_partition (alpha, user, 23);\n" +
+            "store gamma into 'byuser';\n";
+
+        StringReader rd = new StringReader(script);
+        String s = ParserUtil.expandMacros(rd);
+
+        validate(s);
+    }
+
     private void validate(String s) throws Throwable {
         PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
         BufferedReader br = new BufferedReader(new StringReader(s));