You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/07/12 19:33:28 UTC

svn commit: r963377 - in /hadoop/pig/branches/branch-0.7: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/

Author: rding
Date: Mon Jul 12 17:33:28 2010
New Revision: 963377

URL: http://svn.apache.org/viewvc?rev=963377&view=rev
Log:
PIG-1490: Make Pig storers work with remote HDFS in secure mode

Modified:
    hadoop/pig/branches/branch-0.7/CHANGES.txt
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java

Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Mon Jul 12 17:33:28 2010
@@ -196,6 +196,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1490: Make Pig storers work with remote HDFS in secure mode (rding)
+
 PIG-1484: BinStorage should support comma seperated path (daijy)
 
 PIG-1467: order by fail when set "fs.file.impl.disable.cache" to true (daijy)

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java Mon Jul 12 17:33:28 2010
@@ -162,8 +162,8 @@ public abstract class StoreFunc implemen
      */
     public static void cleanupOnFailureImpl(String location, Job job) 
     throws IOException {
-        FileSystem fs = FileSystem.get(job.getConfiguration());
         Path path = new Path(location);
+        FileSystem fs = path.getFileSystem(job.getConfiguration());       
         if(fs.exists(path)){
             fs.delete(path, true);
         }    

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Jul 12 17:33:28 2010
@@ -504,9 +504,9 @@ public class MapReduceLauncher extends L
     }
     
     private void createSuccessFile(Job job, POStore store) throws IOException {
-        if(shouldMarkOutputDir(job)) {
-            FileSystem fs = FileSystem.get(job.getJobConf());
+        if(shouldMarkOutputDir(job)) {      
             Path outputPath = new Path(store.getSFile().getFileName());
+            FileSystem fs = outputPath.getFileSystem(job.getJobConf());
             if(fs.exists(outputPath)){
                 // create a file in the folder to mark it
                 Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Jul 12 17:33:28 2010
@@ -504,6 +504,41 @@ public class QueryParser {
 	     return result;
 	 }
 
+    void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
+        // Get native host
+        String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
+        URI defaultFSURI = new URI(defaultFS);
+        String defaultHost = defaultFSURI.getHost();
+        if (defaultHost == null) defaultHost = "";
+                
+        defaultHost = defaultHost.toLowerCase();
+    
+        Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
+                    
+        String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
+        if (hdfsServersString == null) hdfsServersString = "";
+        String hdfsServers[] = hdfsServersString.split(",");
+                    
+        for (String remoteHost : remoteHosts) {
+            boolean existing = false;
+            for (String hdfsServer : hdfsServers) {
+                if (hdfsServer.equals(remoteHost)) {
+                    existing = true;
+                }
+            }
+            if (!existing) {
+                if (!hdfsServersString.isEmpty()) {
+                    hdfsServersString = hdfsServersString + ",";
+                }
+                hdfsServersString = hdfsServersString + remoteHost;
+            }
+        }
+    
+        if (!hdfsServersString.isEmpty()) {
+            pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
+        }
+    }
+     
      // Check and set files to be automatically shipped for the given StreamingCommand
      // Auto-shipping rules:
      // 1. If the command begins with either perl or python assume that the 
@@ -1388,36 +1423,8 @@ LogicalOperator LoadClause(LogicalPlan l
                 absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext));
                 
                 if (absolutePath!=null) {
-	                // Get native host
-	                String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
-	                URI defaultFSURI = new URI(defaultFS);
-	                String defaultHost = defaultFSURI.getHost();
-	                if (defaultHost==null)
-	                    defaultHost="";
-	                defaultHost = defaultHost.toLowerCase();
-	
-	                Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-	                
-	                String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
-	                if (hdfsServersString==null) hdfsServersString="";
-	                String hdfsServers[] = hdfsServersString.split(",");
-	                
-	                for (String remoteHost : remoteHosts) {
-	                    boolean existing = false;
-	                    for (String hdfsServer:hdfsServers) {
-	                        if (hdfsServer.equals(remoteHost))
-	                            existing = true;
-	                    }
-	                    if (!existing) {
-	                        if (!hdfsServersString.isEmpty())
-	                            hdfsServersString = hdfsServersString + ",";
-	                        hdfsServersString = hdfsServersString + remoteHost;
-	                    }
-	                }
-	
-	                if (!hdfsServersString.isEmpty())
-	                    pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
-	            }
+                    setHdfsServers(absolutePath, pigContext);
+                }
                 fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath);
             }
             lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec),
@@ -2542,6 +2549,9 @@ LogicalOperator StoreClause(LogicalPlan 
         String absolutePath = fileNameMap.get(constructFileNameSignature(fileName, funcSpec));
         if (absolutePath == null) {
             absolutePath = stoFunc.relToAbsPathForStoreLocation(fileName, getCurrentDir(pigContext));
+            if (absolutePath != null) {
+                setHdfsServers(absolutePath, pigContext);
+            }
             fileNameMap.put(constructFileNameSignature(fileName, funcSpec), absolutePath);   
         }
         LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()),

Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java (original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java Mon Jul 12 17:33:28 2010
@@ -109,6 +109,40 @@ protected final Log log = LogFactory.get
                     pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
 
         } catch (IOException io) {
+            fail(io.getMessage());
+        }
+    }
+    
+    @Test
+    public void testRemoteServerList2() throws ExecException, IOException {
+        try {
+            Properties pigProperties = pigServer.getPigContext().getProperties();
+
+            pigServer.setBatchOn();
+            
+            pigServer.registerQuery("a = load '/user/pig/1.txt';");
+            pigServer.registerQuery("store a into '/user/pig/1.txt';");
+            
+            System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+                       
+            pigServer.registerQuery("store a into 'hdfs://b.com/user/pig/1.txt';");
+            System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://b.com"));
+                        
+            pigServer.registerQuery("store a into 'har://hdfs-c.com:8020/user/pig/1.txt';");
+            System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://c.com:8020"));
+                        
+            pigServer.registerQuery("store a into 'hdfs://d.com:8020/user/pig/1.txt';");
+            System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+            assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+                    pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
+
+        } catch (IOException io) {
+            fail(io.getMessage());
         }
     }
 }