You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/02/15 02:09:01 UTC
svn commit: r1070714 - in /pig/trunk: build.xml
src/org/apache/pig/parser/AliasMasker.g
src/org/apache/pig/parser/MacroExpansion.g
src/org/apache/pig/parser/PigMacro.java
test/org/apache/pig/test/TestMacroExpansion.java
Author: rding
Date: Tue Feb 15 01:09:01 2011
New Revision: 1070714
URL: http://svn.apache.org/viewvc?rev=1070714&view=rev
Log:
PIG-1793: Add macro expansion to Pig Latin
Modified:
pig/trunk/build.xml
pig/trunk/src/org/apache/pig/parser/AliasMasker.g
pig/trunk/src/org/apache/pig/parser/MacroExpansion.g
pig/trunk/src/org/apache/pig/parser/PigMacro.java
pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Tue Feb 15 01:09:01 2011
@@ -320,7 +320,7 @@
description="generates token parser class from an ANTLR grammar">
<java classname="org.antlr.Tool"
classpathref="classpath" fork="true">
- <arg line="-o ${src.gen.dir}/${grammar.package.dir} ${src.dir}/${grammar.package.dir}/MacroExpansion.g ${src.dir}/${grammar.package.dir}/MacroImport.g ${src.dir}/${grammar.package.dir}/${grammar.name}Parser.g"/>
+ <arg line="-o ${src.gen.dir}/${grammar.package.dir} ${src.dir}/${grammar.package.dir}/MacroExpansion.g ${src.dir}/${grammar.package.dir}/MacroRecursion.g ${src.dir}/${grammar.package.dir}/MacroImport.g ${src.dir}/${grammar.package.dir}/${grammar.name}Parser.g"/>
</java>
</target>
Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Tue Feb 15 01:09:01 2011
@@ -75,6 +75,8 @@ private String getMask(String alias) {
private Set<String> params = new HashSet<String>();
+private Set<String> aliasSeen = new HashSet<String>();
+
private String macroName = "";
private long index = 0;
@@ -109,7 +111,8 @@ foreach_statement
: ^( STATEMENT ( alias { sb.append(" = "); } )? foreach_clause { sb.append("\n"); } )
;
-alias : IDENTIFIER { sb.append(getMask($IDENTIFIER.text)); }
+alias
+ : IDENTIFIER { sb.append(getMask($IDENTIFIER.text)); aliasSeen.add($IDENTIFIER.text); }
;
op_clause : define_clause
@@ -530,7 +533,13 @@ col_ref : alias_col_ref | dollar_col_ref
alias_col_ref
: GROUP { sb.append($GROUP.text); }
- | IDENTIFIER { sb.append($IDENTIFIER.text); }
+ | name = IDENTIFIER {
+ if (aliasSeen.contains($name.text)) {
+ sb.append(getMask($name.text));
+ } else {
+ sb.append($name.text);
+ }
+ }
;
dollar_col_ref
@@ -689,3 +698,4 @@ rel_str_op
| STR_OP_LTE { sb.append($STR_OP_LTE.text); }
| STR_OP_MATCHES { sb.append($STR_OP_MATCHES.text); }
;
+
Modified: pig/trunk/src/org/apache/pig/parser/MacroExpansion.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/MacroExpansion.g?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/MacroExpansion.g (original)
+++ pig/trunk/src/org/apache/pig/parser/MacroExpansion.g Tue Feb 15 01:09:01 2011
@@ -27,12 +27,11 @@ package org.apache.pig.parser;
import java.util.HashMap;
import java.util.List;
-import java.io.IOException;
import org.apache.pig.parser.PigMacro;
}
@members {
- private HashMap<String, PigMacro> memory = new HashMap<String, PigMacro>();
+ private Map<String, PigMacro> memory = new HashMap<String, PigMacro>();
private StringBuilder sb = new StringBuilder();
public String getResultString() { return sb.toString(); }
@@ -42,7 +41,7 @@ MACRO
: 'define' WS name=ALIAS WS? '(' ( params+=ALIAS (',' WS* params+=ALIAS)* )? ')' WS 'returns' WS rets+=ALIAS (',' WS* rets+=ALIAS)* WS '{' content=BLOCK '};'
{
PigMacro macro = new PigMacro($name.text);
- macro.setBody($content.text);
+ macro.setBody($content.text, memory);
if ($params != null) {
for (Object param : $params) {
macro.addParam(((Token)param).getText());
Modified: pig/trunk/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/PigMacro.java?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/trunk/src/org/apache/pig/parser/PigMacro.java Tue Feb 15 01:09:01 2011
@@ -18,22 +18,24 @@
package org.apache.pig.parser;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.antlr.runtime.ANTLRReaderStream;
import org.antlr.runtime.CharStream;
import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.Token;
import org.antlr.runtime.tree.CommonTreeNodeStream;
import org.antlr.runtime.tree.Tree;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
import org.apache.pig.tools.parameters.ParseException;
@@ -45,18 +47,22 @@ public class PigMacro {
private String body;
private List<String> params;
private List<String> rets;
+ private Map<String, PigMacro> seen;
private long idx = 0;
public PigMacro(String name) {
this.name = name;
this.params = new ArrayList<String>();
this.rets = new ArrayList<String>();
+ LOG.info("Macro '" + name + "' is defined");
}
- public void setBody(String body) {
+ public void setBody(String body, Map<String, PigMacro> seen) {
this.body = body;
+ this.seen = new HashMap<String, PigMacro>(seen);
+ expandBody();
}
-
+
public void addParam(String param) {
params.add(param);
}
@@ -74,8 +80,20 @@ public class PigMacro {
public List<String> getReturns() { return rets; }
public String inline(String[] inputs, String[] outputs) {
- ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(
- 50);
+ String in = substituteParams(inputs, outputs);
+ Set<String> masks = new HashSet<String>();
+ if (inputs != null) {
+ for (String s : inputs) {
+ masks.add(s);
+ }
+ }
+ for (String s : outputs) {
+ masks.add(s);
+ }
+ return maskAlias(in, masks);
+ }
+
+ public String substituteParams(String[] inputs, String[] outputs) {
if ((inputs == null && !params.isEmpty())
|| (inputs != null && inputs.length != params.size())) {
throw new RuntimeException("Failed to expand macro '" + name
@@ -83,26 +101,29 @@ public class PigMacro {
+ " actual number of inputs: "
+ ((inputs == null) ? 0 : inputs.length));
}
- if ((outputs == null && !rets.isEmpty())
- || (outputs != null && outputs.length != rets.size())) {
+ if (outputs == null || outputs.length != rets.size()) {
throw new RuntimeException("Failed to expand macro '" + name
+ "': expected number of return aliases: " + rets.size()
+ " actual number of return values: "
+ ((outputs == null) ? 0 : outputs.length));
}
- Set<String> masks = new HashSet<String>();
+
String[] args = new String[params.size() + rets.size()];
for (int i=0; i<params.size(); i++) {
- args[i] = params.get(i) + "=" + inputs[i];
- masks.add(inputs[i]);
+ String p = inputs[i];
+ p = p.startsWith("$") ? ("\\\\" + p) : p;
+ args[i] = params.get(i) + "=" + p;
}
for (int i=0; i<rets.size(); i++) {
- args[params.size() + i] = rets.get(i) + "=" + outputs[i];
- masks.add(outputs[i]);
+ String p = outputs[i];
+ p = p.startsWith("$") ? ("\\\\" + p) : p;
+ args[params.size() + i] = rets.get(i) + "=" + p;
}
StringWriter writer = new StringWriter();
BufferedReader in = new BufferedReader(new StringReader(body));
try {
+ ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(
+ 50);
psp.genSubstitutedFile(in, writer, args, null);
} catch (ParseException e) {
throw new RuntimeException(
@@ -110,10 +131,14 @@ public class PigMacro {
}
LOG.debug("--- after substition:\n" + writer.toString());
-
+
+ return writer.toString();
+ }
+
+ public String maskAlias(String in, Set<String> masks) {
String resultString = "";
try {
- CharStream input = new QueryParserStringStream(writer.toString());
+ CharStream input = new QueryParserStringStream(in);
QueryLexer lex = new QueryLexer(input);
CommonTokenStream tokens = new CommonTokenStream(lex);
@@ -141,4 +166,25 @@ public class PigMacro {
return resultString;
}
+ private void expandBody() {
+ // expand macros
+ boolean done = false;
+
+ while (!done) {
+ StringReader srd = new StringReader(body);
+ ANTLRReaderStream input;
+ try {
+ input = new ANTLRReaderStream(srd);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read ", e);
+ }
+ MacroRecursion expander = new MacroRecursion(input);
+ expander.setMacros(seen);
+ Token token = Token.EOF_TOKEN;
+ while ((token = expander.nextToken()) != Token.EOF_TOKEN);
+
+ body = expander.getResultString();
+ done = !expander.isExpanded();
+ }
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java?rev=1070714&r1=1070713&r2=1070714&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMacroExpansion.java Tue Feb 15 01:09:01 2011
@@ -1058,6 +1058,144 @@ public class TestMacroExpansion {
validate(s);
}
+
+ @Test
+ public void recursiveMacrosTest() throws Throwable {
+ String macro1 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+ " C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+ " $B = foreach_count(C, $A);" +
+ "};\n";
+
+ String macro2 = "define foreach_count(A, C) returns B {\n" +
+ " $B = foreach $A generate group, COUNT($C);\n" +
+ "};\n";
+
+
+ String script = macro2 + macro1 +
+ "alpha = load 'users' as (user, age, zip);\n" +
+ "gamma = group_and_partition (alpha, user, 23);\n" +
+ "store gamma into 'byuser';\n";
+
+ StringReader rd = new StringReader(script);
+ String s = ParserUtil.expandMacros(rd);
+
+ validate(s);
+
+ String expected =
+ "\n\nalpha = load 'users' as (user, age, zip);\n" +
+ "macro_group_and_partition_C_0 = group alpha by (user) partition BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 23;\n" +
+ "gamma = foreach macro_group_and_partition_C_0 generate group, COUNT(alpha);\n\n" +
+ "store gamma into 'byuser';\n";
+
+ Assert.assertEquals(expected, s);
+ }
+
+ @Test
+ public void recursiveMacrosTest2() throws Throwable {
+ String macro1 = "define foreach_count(A, C) returns B {\n" +
+ " $B = foreach $A generate group, COUNT($C);\n" +
+ "};\n";
+
+ String macro2 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+ " C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+ " $B = foreach_count(C, $A);\n" +
+ "};\n";
+
+ String macro3 = "define load_and_group() returns B {\n" +
+ " alpha = load 'users' as (user, age, zip);\n" +
+ " $B = group_and_partition(alpha, user, 30);\n" +
+ "};\n";
+
+ String script = macro1 + macro2 + macro3 +
+ "gamma = load_and_group ();\n" +
+ "store gamma into 'byuser';\n";
+
+ StringReader rd = new StringReader(script);
+ String s = ParserUtil.expandMacros(rd);
+
+ validate(s);
+
+ String expected =
+ "\n\n\nmacro_load_and_group_alpha_0 = load 'users' as (user, age, zip);\n" +
+ "macro_load_and_group_C_0 = group macro_load_and_group_alpha_0 by (user) partition BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 30;\n" +
+ "gamma = foreach macro_load_and_group_C_0 generate group, COUNT(macro_load_and_group_alpha_0);\n\n" +
+ "store gamma into 'byuser';\n";
+
+ Assert.assertEquals(expected, s);
+ }
+
+ @Test
+ public void sequenceMacrosTest() throws Throwable {
+ String macro1 = "define foreach_count(A, C) returns B {\n" +
+ " $B = foreach $A generate group, COUNT($C);\n" +
+ "};\n";
+
+ String macro2 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+ " $B = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+ "};\n";
+
+ String script = macro1 + macro2 +
+ "alpha = load 'users' as (user, age, zip);\n" +
+ "beta = group_and_partition (alpha, user, 20);\n" +
+ "gamma = foreach_count(beta, alpha);\n" +
+ "store gamma into 'byuser';\n";
+
+ StringReader rd = new StringReader(script);
+ String s = ParserUtil.expandMacros(rd);
+
+ validate(s);
+
+ String expected =
+ "\n\nalpha = load 'users' as (user, age, zip);\n" +
+ "beta = group alpha by (user) partition BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 20;\n\n" +
+ "gamma = foreach beta generate group, COUNT(alpha);\n\n" +
+ "store gamma into 'byuser';\n";
+
+ Assert.assertEquals(expected, s);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void selfRecursiveTest() throws Throwable {
+ String macro1 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+ " C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+ " $B = group_and_partition(C, age, 34);" +
+ "};\n";
+
+ String script = macro1 +
+ "alpha = load 'users' as (user, age, zip);\n" +
+ "gamma = group_and_partition (alpha, user, 23);\n" +
+ "store gamma into 'byuser';\n";
+
+ StringReader rd = new StringReader(script);
+ String s = ParserUtil.expandMacros(rd);
+
+ validate(s);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void cyclicRecursiveTest() throws Throwable {
+ String macro1 = "define group_and_partition (A, group_key, reducers) returns B {\n" +
+ " C = group $A by $group_key partition by org.apache.pig.test.utils.SimpleCustomPartitioner parallel $reducers;\n" +
+ " $B = foreach_count(C, $A);" +
+ "};\n";
+
+ String macro2 = "define foreach_count(A, C) returns B {\n" +
+ " $B = foreach $A generate group, COUNT($C);\n" +
+ " D = group_and_partition($C, age, 23);" +
+ "};\n";
+
+
+ String script = macro2 + macro1 +
+ "alpha = load 'users' as (user, age, zip);\n" +
+ "gamma = group_and_partition (alpha, user, 23);\n" +
+ "store gamma into 'byuser';\n";
+
+ StringReader rd = new StringReader(script);
+ String s = ParserUtil.expandMacros(rd);
+
+ validate(s);
+ }
+
private void validate(String s) throws Throwable {
PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
BufferedReader br = new BufferedReader(new StringReader(s));