You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/07/08 07:55:29 UTC

svn commit: r1144163 - in /pig/trunk: CHANGES.txt src/org/apache/pig/PigServer.java src/org/apache/pig/impl/io/FileLocalizer.java test/org/apache/pig/test/TestPigServer.java

Author: dvryaboy
Date: Fri Jul  8 05:55:29 2011
New Revision: 1144163

URL: http://svn.apache.org/viewvc?rev=1144163&view=rev
Log:
PIG-2142: Allow registering multiple jars from DFS via single statement

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    pig/trunk/test/org/apache/pig/test/TestPigServer.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jul  8 05:55:29 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2142: Allow registering multiple jars from DFS via single statement (rangadi via dvryaboy)
+
 PIG-1926: Sample/Limit should take scalar (azaroth via thejas)
 
 PIG-1950: e2e test harness needs to be able to compare to previous version of

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Jul  8 05:55:29 2011
@@ -67,6 +67,7 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -437,92 +438,29 @@ public class PigServer {
         pigContext.registerStreamCmd(commandAlias, command);
     }
 
-    private void collectMatchedFiles(File startDir, String patten, List<URL> matchedFiles) {
-        File[] files = startDir.listFiles();
-        for (File file : files) {
-            if(file.isFile() && file.getName().matches(patten) ){
-                try {
-                    if(!matchedFiles.contains(file.toURI().toURL()))
-                        matchedFiles.add(file.toURI().toURL());
-                } catch (MalformedURLException e) {
-                    // Should never happen
-                }
-            }
-        }
-    }
-    
-    private List<URL> locateJarFromResources(String jarName) throws IOException {
-        // If jarName is a globbing, Pig only locate jars in local file system:
-        // * if user give an absolute path, Pig only search the given path
-        // * if user give a relative path, Pig search path relative to user working directory
-        // If jarName is not globbing:
-        // * first, if it is absolute path, Pig return the given path if exists
-        // * second, Pig use getSystemResources to locate
-        // * third, Pig search working directory
-        // * next, Pig use FileLocalizer.fetchFile to try to locate external resource
-        
-        String workingDir = System.getProperty("user.dir");
-        List<URL> matchedFiles = new ArrayList<URL>();
-        if(jarName.contains("*") ){
-            File givenPath = new File(jarName);
-            // If relative path, make it relative to working directory
-            if (!givenPath.isAbsolute()) {
-                givenPath = new File(workingDir, jarName);
-            }
-            File parentDir = givenPath.getParentFile();
-            String matchPatten = givenPath.getName();
-            matchPatten = matchPatten.replaceAll("\\*", ".*");
-            if (parentDir!=null) {
-                collectMatchedFiles(parentDir, matchPatten, matchedFiles);
-            } else if (workingDir!=null) {
-                collectMatchedFiles(new File(workingDir), matchPatten, matchedFiles);
-            }
+    private URL locateJarFromResources(String jarName) throws IOException {
+        Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
+        URL resourceLocation = null;
+
+        if (urls.hasMoreElements()) {
+            resourceLocation = urls.nextElement();
         }
-        else {
-            if (new File(jarName).isAbsolute()) {
-                File absoluteFile = new File(jarName);
-                if (absoluteFile.exists())
-                    matchedFiles.add(absoluteFile.toURI().toURL());
-            }
-            // getSystemResources
-            if (matchedFiles.size()==0) {
-                Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
-                while (urls.hasMoreElements()) {
-                    matchedFiles.add(urls.nextElement());
-                }
-            }
-            // Search working directory
-            if (matchedFiles.size()==0) {
-                File file = new File(workingDir, jarName);
-                if (file.exists())
-                    matchedFiles.add(file.toURI().toURL());
-            }
-            // Try FileLocalizer.fetchFile
-            if (matchedFiles.size()==0) {
-                File file = FileLocalizer.fetchFile(pigContext.getProperties(), jarName).file;
-                if (!file.canRead()) {
-                    int errCode = 4002;
-                    String msg = "Can't read jar file: " + jarName;
-                    throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
-                }
-                matchedFiles.add(file.toURI().toURL());
-            }
-            // Check error condition
-            if (matchedFiles.size()>1) {
-                StringBuffer sb = new StringBuffer("Found multiple resources that match ");
-                for (int i=1;i<matchedFiles.size();i++) {
-                    sb.append(matchedFiles.get(i));
-                    if (i!=matchedFiles.size()-1)
-                        sb.append(":");
-                }
-                log.debug(sb.toString());
-                // remove extras
-                for (int i=0;i<matchedFiles.size();i++) {
-                    matchedFiles.remove(1);
-                }
+
+        if (urls.hasMoreElements()) {
+            StringBuffer sb = new StringBuffer("Found multiple resources that match ");
+            sb.append(jarName);
+            sb.append(": ");
+            sb.append(resourceLocation);
+
+            while (urls.hasMoreElements()) {
+                sb.append(urls.nextElement());
+                sb.append("; ");
             }
+
+            log.debug(sb.toString());
         }
-        return matchedFiles;
+
+        return resourceLocation;
     }
     
     /**
@@ -537,10 +475,28 @@ public class PigServer {
      * @throws IOException
      */
     public void registerJar(String name) throws IOException {
+        // first try to locate jar via system resources
+        // if this fails, try by using "name" as File (this preserves
+        // compatibility with case when user passes absolute path or path
+        // relative to current working directory.)
         if (name != null) {
-            List<URL> resource = locateJarFromResources(name);
-            for(int i=0; i< resource.size(); i++ ){
-                pigContext.addJar(resource.get(i)); 
+            URL resource = locateJarFromResources(name);
+
+            if (resource == null) {
+                FetchFileRet[] files = FileLocalizer.fetchFiles(pigContext.getProperties(), name);
+
+                for(FetchFileRet file : files) {
+                  File f = file.file;
+                  if (!f.canRead()) {
+                    int errCode = 4002;
+                    String msg = "Can't read jar file: " + name;
+                    throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
+                  }
+
+                  pigContext.addJar(f.toURI().toURL());
+                }
+            } else {
+              pigContext.addJar(resource);
             }
         }
     }

Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Fri Jul  8 05:55:29 2011
@@ -26,7 +26,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
@@ -734,37 +734,92 @@ public class FileLocalizer {
      * directory.
      */
     public static FetchFileRet fetchFile(Properties properties, String filePath) throws IOException {
-        // Create URI from String.
-        URI fileUri = null;
-        try {
-            fileUri = new URI(filePath);
-        } catch (URISyntaxException e) {
-            throw new RuntimeException(e);
-        }
-        // If URI is a local file, verify it exists and return.
-        if (((!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))) && (fileUri.getScheme() == null))
-                || "file".equalsIgnoreCase(fileUri.getScheme())
-                || "local".equalsIgnoreCase(fileUri.getScheme())) {
-            File res = new File(fileUri.getPath());
-            if (!res.exists()) {
-                throw new ExecException("Local file '" + filePath + "' does not exist.", 101, PigException.INPUT);
-            }
-            return new FetchFileRet(res, false);
+        return fetchFilesInternal(properties, filePath, false)[0];
+    }
+
+    /**
+     * Ensures that the passed files pointed to by path are on the local file system,
+     * fetching them to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true
+     * and dfs is not null, then a relative path is assumed to be relative to the passed
+     * dfs active directory. Else they are assumed to be relative to the local working
+     * directory.
+     */
+    public static FetchFileRet[] fetchFiles(Properties properties, String filePath) throws IOException {
+        return fetchFilesInternal(properties, filePath, true);
+    }
+
+    /**
+     * Copies the files from remote to local filesystem.
+     * When 'multipleFiles' is set the path could point to multiple files
+     * through globs or a directory. In this case, return array contains multiple
+     * files, otherwise a single file is returned.
+     *
+     * If pig.jars.relative.to.dfs is true then a relative path is assumed to be
+     * relative to the default filesystem's active directory.
+     * Else they are assumed to be relative to the local working directory.
+     *
+     * @param properties
+     * @param filePath
+     * @param multipleFiles
+     * @return
+     */
+    private static FetchFileRet[] fetchFilesInternal(Properties properties,
+                                            String filePath,
+                                            boolean multipleFiles) throws IOException {
+
+        Path path = new Path(filePath);
+        URI uri = path.toUri();
+        Configuration conf = new Configuration();
+        ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties));
+
+        // if there is no schema or if the schema is "local", then it is
+        // expected to be a local path.
+
+        FileSystem localFs = FileSystem.getLocal(conf);
+        FileSystem srcFs;
+        if ( (!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))
+                && uri.getScheme() == null )||
+                uri.getScheme().equals("local") ) {
+            srcFs = localFs;
         } else {
-            
-            Path src = new Path(fileUri.getPath());
-            File parent = (localTempDir != null) ? localTempDir : new File(System.getProperty("java.io.tmpdir")); 
-            File dest = new File(parent, src.getName());
-            dest.deleteOnExit();
-            try {
-                Configuration configuration = new Configuration();
-                ConfigurationUtil.mergeConf(configuration, ConfigurationUtil.toConfiguration(properties));
-                FileSystem srcFs = FileSystem.get(fileUri, configuration);
-                srcFs.copyToLocalFile(src, new Path(dest.getAbsolutePath()));
-            } catch (IOException e) {
-                throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, PigException.INPUT, e);
+            srcFs = path.getFileSystem(conf);
+        }
+
+        FileStatus[] files;
+
+        if (multipleFiles) {
+            files = srcFs.globStatus(path);
+        } else {
+            files = new FileStatus[]{ srcFs.getFileStatus(path) };
+        }
+        if (files == null || files.length == 0) {
+            throw new ExecException("file '" + filePath + "' does not exist.", 101, PigException.INPUT);
+        }
+
+        FetchFileRet[] fetchFiles = new FetchFileRet[files.length];
+        int idx = 0;
+
+        for(FileStatus file : files) {
+            // should throw an exception if this is not a file?
+
+            String pathname = file.getPath().toUri().getPath();
+            String filename = file.getPath().getName();
+
+            if (srcFs == localFs) {
+                fetchFiles[idx++] = new FetchFileRet(new File(pathname), false);
+            } else {
+                // fetch from remote:
+                File dest = new File(localTempDir, filename);
+                dest.deleteOnExit();
+                try {
+                    srcFs.copyToLocalFile(file.getPath(), new Path(dest.getAbsolutePath()));
+                } catch (IOException e) {
+                    throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, PigException.INPUT, e);
+                }
+                fetchFiles[idx++] = new FetchFileRet(dest, true);
             }
-            return new FetchFileRet(dest, true);
         }
+
+        return fetchFiles;
     }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Fri Jul  8 05:55:29 2011
@@ -42,6 +42,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.impl.PigContext;
 
 
@@ -119,14 +122,19 @@ public class TestPigServer {
     
     // creates an empty jar file
     private static void createFakeJarFile(String location, String name) 
-            throws IOException {
-        Assert.assertFalse((new File(name)).canRead());
-        
-        System.err. println("Location: " + location);
-        new File(location).mkdirs();
-        
-        Assert.assertTrue((new File(location + FILE_SEPARATOR + name)).
-                    createNewFile());
+                                          throws IOException {
+        createFakeJarFile(location, name,
+                FileSystem.getLocal(cluster.getConfiguration()).getRaw());
+    }
+
+    // creates an empty jar file
+    private static void createFakeJarFile(String location, String name, FileSystem fs)
+                                          throws IOException {
+        System.err. println("Location: " + location + " name: " + name);
+        Path dir = new Path(location);
+        fs.mkdirs(dir);
+
+        Assert.assertTrue(fs.createNewFile(new Path(dir, name)));
     }
     
     // dynamically add more resources to the system class loader
@@ -360,6 +368,37 @@ public class TestPigServer {
     }
 
     @Test
+    public void testRegisterRemoteGlobbingJar() throws Throwable {
+        String dir = "test1_register_remote_jar_globbing";
+        String jarLocation = dir + FILE_SEPARATOR;
+        String jar1Name = "TestRegisterRemoteJarGlobbing1.jar";
+        String jar2Name = "TestRegisterRemoteJarGlobbing2.jar";
+
+        FileSystem fs = cluster.getFileSystem();
+        createFakeJarFile(jarLocation, jar1Name, fs);
+        createFakeJarFile(jarLocation, jar2Name, fs);
+
+        // find the absolute path for the directory so that it does not
+        // depend on configuration
+        String absPath = fs.getFileStatus(new Path(jarLocation)).getPath().toString();
+
+        boolean exceptionRaised = false;
+        try {
+            pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+            exceptionRaised = true;
+        }
+        Assert.assertFalse(exceptionRaised);
+        verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
+        verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
+
+        // clean-up
+        Assert.assertTrue(fs.delete(new Path(jarLocation), true));
+    }
+
+    @Test
     public void testDescribeLoad() throws Throwable {
         PrintStream console = System.out;
         PrintStream out = new PrintStream(new BufferedOutputStream(new FileOutputStream(stdOutRedirectedFile)));