You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/14 01:02:16 UTC

svn commit: r1300421 - in /pig/trunk: CHANGES.txt src/org/apache/pig/parser/QueryParserDriver.java test/org/apache/pig/test/TestPigServer.java

Author: daijy
Date: Wed Mar 14 00:02:16 2012
New Revision: 1300421

URL: http://svn.apache.org/viewvc?rev=1300421&view=rev
Log:
PIG-2565: Support IMPORT for macros stored in S3 Buckets

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java
    pig/trunk/test/org/apache/pig/test/TestPigServer.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1300421&r1=1300420&r2=1300421&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 14 00:02:16 2012
@@ -269,6 +269,8 @@ PIG-2228: support partial aggregation in
 
 BUG FIXES
 
+PIG-2565: Support IMPORT for macros stored in S3 Buckets (daijy)
+
 PIG-2570: LimitOptimizer fails with dynamic LIMIT argument (daijy)
 
 PIG-2543: PigStats.isSuccessful returns false if embedded pig script has sh commands (daijy)

Modified: pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java?rev=1300421&r1=1300420&r2=1300421&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserDriver.java Wed Mar 14 00:02:16 2012
@@ -38,6 +38,8 @@ import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
@@ -59,6 +61,8 @@ public class QueryParserDriver {
     private Set<String> importSeen;
     private Set<String> macroSeen;
     
+    private static Map<String, FetchFileRet> fnameMap = new HashMap<String, FetchFileRet>();
+    
     public QueryParserDriver(PigContext pigContext, String scope, Map<String, String> fileNameMap) {
         this.pigContext = pigContext;
         this.scope = scope;
@@ -337,6 +341,21 @@ public class QueryParserDriver {
         }
     }
     
+    private FetchFileRet getMacroFile(String fname) {
+        FetchFileRet localFileRet = null;
+        try {
+            if (fnameMap.get(fname)!=null) {
+                localFileRet = fnameMap.get(fname);
+            } else {
+                localFileRet = FileLocalizer.fetchFile(pigContext.getProperties(), fname);
+                fnameMap.put(fname, localFileRet);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to fetch macro file '" + fname + "'", e);
+        }
+        return localFileRet;
+    }
+    
     /*
      * MacroDef node has two child nodes:
      *      1. name
@@ -383,8 +402,10 @@ public class QueryParserDriver {
         String body = bodyNode.getChild(0).getText();
 
         body = body.substring(1, body.length() - 1);
+        
+        FetchFileRet localFileRet = getMacroFile(fname);
 
-        PigMacro pm = new PigMacro(mn, fname, params, returns, body, seen);
+        PigMacro pm = new PigMacro(mn, localFileRet.file.getAbsolutePath(), params, returns, body, seen);
         
         try {
             pm.validate();
@@ -418,10 +439,11 @@ public class QueryParserDriver {
             throw new ParserException(msg);
         }
         
+        FetchFileRet localFileRet = getMacroFile(fname);
         
         BufferedReader in = null;
         try {
-            in = QueryParserUtils.getImportScriptAsReader(fname);
+            in = QueryParserUtils.getImportScriptAsReader(localFileRet.file.getAbsolutePath());
         } catch (FileNotFoundException e) {
             String msg = getErrorMessage(fname, t,
                     "Failed to import file '" + fname + "'", e.getMessage());

Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1300421&r1=1300420&r2=1300421&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Wed Mar 14 00:02:16 2012
@@ -397,6 +397,27 @@ public class TestPigServer {
         // clean-up
         Assert.assertTrue(fs.delete(new Path(jarLocation), true));
     }
+    
+    @Test
+    public void testRegisterRemoteMacro() throws Throwable {
+        String macroName = "util.pig";
+        File macroFile = File.createTempFile("tmp", "");
+        PrintWriter pw = new PrintWriter(new FileWriter(macroFile));
+        pw.println("DEFINE row_count(X) RETURNS Z { Y = group $X all; $Z = foreach Y generate COUNT($X); };");
+        pw.close();
+        
+        FileSystem fs = cluster.getFileSystem();
+        fs.copyFromLocalFile(new Path(macroFile.getAbsolutePath()), new Path(macroName));
+        
+        Util.createInputFile(cluster, "testRegisterRemoteMacro_input", new String[]{"1", "2"});
+        
+        pig.registerQuery("import 'util.pig';");
+        pig.registerQuery("a = load 'testRegisterRemoteMacro_input';");
+        pig.registerQuery("b = row_count(a);");
+        Iterator<Tuple> iter = pig.openIterator("b");
+        
+        Assert.assertTrue(((Long)iter.next().get(0))==2);
+    }
 
     @Test
     public void testDescribeLoad() throws Throwable {