You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/14 01:02:16 UTC
svn commit: r1300421 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/parser/QueryParserDriver.java
test/org/apache/pig/test/TestPigServer.java
Author: daijy
Date: Wed Mar 14 00:02:16 2012
New Revision: 1300421
URL: http://svn.apache.org/viewvc?rev=1300421&view=rev
Log:
PIG-2565: Support IMPORT for macros stored in S3 Buckets
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java
pig/trunk/test/org/apache/pig/test/TestPigServer.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1300421&r1=1300420&r2=1300421&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 14 00:02:16 2012
@@ -269,6 +269,8 @@ PIG-2228: support partial aggregation in
BUG FIXES
+PIG-2565: Support IMPORT for macros stored in S3 Buckets (daijy)
+
PIG-2570: LimitOptimizer fails with dynamic LIMIT argument (daijy)
PIG-2543: PigStats.isSuccessful returns false if embedded pig script has sh commands (daijy)
Modified: pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java?rev=1300421&r1=1300420&r2=1300421&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java Wed Mar 14 00:02:16 2012
@@ -38,6 +38,8 @@ import org.antlr.runtime.tree.Tree;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
@@ -59,6 +61,8 @@ public class QueryParserDriver {
private Set<String> importSeen;
private Set<String> macroSeen;
+ private static Map<String, FetchFileRet> fnameMap = new HashMap<String, FetchFileRet>();
+
public QueryParserDriver(PigContext pigContext, String scope, Map<String, String> fileNameMap) {
this.pigContext = pigContext;
this.scope = scope;
@@ -337,6 +341,21 @@ public class QueryParserDriver {
}
}
+ private FetchFileRet getMacroFile(String fname) {
+ FetchFileRet localFileRet = null;
+ try {
+ if (fnameMap.get(fname)!=null) {
+ localFileRet = fnameMap.get(fname);
+ } else {
+ localFileRet = FileLocalizer.fetchFile(pigContext.getProperties(), fname);
+ fnameMap.put(fname, localFileRet);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to fetch macro file '" + fname + "'", e);
+ }
+ return localFileRet;
+ }
+
/*
* MacroDef node has two child nodes:
* 1. name
@@ -383,8 +402,10 @@ public class QueryParserDriver {
String body = bodyNode.getChild(0).getText();
body = body.substring(1, body.length() - 1);
+
+ FetchFileRet localFileRet = getMacroFile(fname);
- PigMacro pm = new PigMacro(mn, fname, params, returns, body, seen);
+ PigMacro pm = new PigMacro(mn, localFileRet.file.getAbsolutePath(), params, returns, body, seen);
try {
pm.validate();
@@ -418,10 +439,11 @@ public class QueryParserDriver {
throw new ParserException(msg);
}
+ FetchFileRet localFileRet = getMacroFile(fname);
BufferedReader in = null;
try {
- in = QueryParserUtils.getImportScriptAsReader(fname);
+ in = QueryParserUtils.getImportScriptAsReader(localFileRet.file.getAbsolutePath());
} catch (FileNotFoundException e) {
String msg = getErrorMessage(fname, t,
"Failed to import file '" + fname + "'", e.getMessage());
Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1300421&r1=1300420&r2=1300421&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Wed Mar 14 00:02:16 2012
@@ -397,6 +397,27 @@ public class TestPigServer {
// clean-up
Assert.assertTrue(fs.delete(new Path(jarLocation), true));
}
+
+ @Test
+ public void testRegisterRemoteMacro() throws Throwable {
+ String macroName = "util.pig";
+ File macroFile = File.createTempFile("tmp", "");
+ PrintWriter pw = new PrintWriter(new FileWriter(macroFile));
+ pw.println("DEFINE row_count(X) RETURNS Z { Y = group $X all; $Z = foreach Y generate COUNT($X); };");
+ pw.close();
+
+ FileSystem fs = cluster.getFileSystem();
+ fs.copyFromLocalFile(new Path(macroFile.getAbsolutePath()), new Path(macroName));
+
+ Util.createInputFile(cluster, "testRegisterRemoteMacro_input", new String[]{"1", "2"});
+
+ pig.registerQuery("import 'util.pig';");
+ pig.registerQuery("a = load 'testRegisterRemoteMacro_input';");
+ pig.registerQuery("b = row_count(a);");
+ Iterator<Tuple> iter = pig.openIterator("b");
+
+ Assert.assertTrue(((Long)iter.next().get(0))==2);
+ }
@Test
public void testDescribeLoad() throws Throwable {