You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/07/08 07:55:29 UTC
svn commit: r1144163 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/PigServer.java
src/org/apache/pig/impl/io/FileLocalizer.java
test/org/apache/pig/test/TestPigServer.java
Author: dvryaboy
Date: Fri Jul 8 05:55:29 2011
New Revision: 1144163
URL: http://svn.apache.org/viewvc?rev=1144163&view=rev
Log:
PIG-2142: Allow registering multiple jars from DFS via single statement
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
pig/trunk/test/org/apache/pig/test/TestPigServer.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jul 8 05:55:29 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2142: Allow registering multiple jars from DFS via single statement (rangadi via dvryaboy)
+
PIG-1926: Sample/Limit should take scalar (azaroth via thejas)
PIG-1950: e2e test harness needs to be able to compare to previous version of
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Jul 8 05:55:29 2011
@@ -67,6 +67,7 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -437,92 +438,29 @@ public class PigServer {
pigContext.registerStreamCmd(commandAlias, command);
}
- private void collectMatchedFiles(File startDir, String patten, List<URL> matchedFiles) {
- File[] files = startDir.listFiles();
- for (File file : files) {
- if(file.isFile() && file.getName().matches(patten) ){
- try {
- if(!matchedFiles.contains(file.toURI().toURL()))
- matchedFiles.add(file.toURI().toURL());
- } catch (MalformedURLException e) {
- // Should never happen
- }
- }
- }
- }
-
- private List<URL> locateJarFromResources(String jarName) throws IOException {
- // If jarName is a globbing, Pig only locate jars in local file system:
- // * if user give an absolute path, Pig only search the given path
- // * if user give a relative path, Pig search path relative to user working directory
- // If jarName is not globbing:
- // * first, if it is absolute path, Pig return the given path if exists
- // * second, Pig use getSystemResources to locate
- // * third, Pig search working directory
- // * next, Pig use FileLocalizer.fetchFile to try to locate external resource
-
- String workingDir = System.getProperty("user.dir");
- List<URL> matchedFiles = new ArrayList<URL>();
- if(jarName.contains("*") ){
- File givenPath = new File(jarName);
- // If relative path, make it relative to working directory
- if (!givenPath.isAbsolute()) {
- givenPath = new File(workingDir, jarName);
- }
- File parentDir = givenPath.getParentFile();
- String matchPatten = givenPath.getName();
- matchPatten = matchPatten.replaceAll("\\*", ".*");
- if (parentDir!=null) {
- collectMatchedFiles(parentDir, matchPatten, matchedFiles);
- } else if (workingDir!=null) {
- collectMatchedFiles(new File(workingDir), matchPatten, matchedFiles);
- }
+ private URL locateJarFromResources(String jarName) throws IOException {
+ Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
+ URL resourceLocation = null;
+
+ if (urls.hasMoreElements()) {
+ resourceLocation = urls.nextElement();
}
- else {
- if (new File(jarName).isAbsolute()) {
- File absoluteFile = new File(jarName);
- if (absoluteFile.exists())
- matchedFiles.add(absoluteFile.toURI().toURL());
- }
- // getSystemResources
- if (matchedFiles.size()==0) {
- Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
- while (urls.hasMoreElements()) {
- matchedFiles.add(urls.nextElement());
- }
- }
- // Search working directory
- if (matchedFiles.size()==0) {
- File file = new File(workingDir, jarName);
- if (file.exists())
- matchedFiles.add(file.toURI().toURL());
- }
- // Try FileLocalizer.fetchFile
- if (matchedFiles.size()==0) {
- File file = FileLocalizer.fetchFile(pigContext.getProperties(), jarName).file;
- if (!file.canRead()) {
- int errCode = 4002;
- String msg = "Can't read jar file: " + jarName;
- throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
- }
- matchedFiles.add(file.toURI().toURL());
- }
- // Check error condition
- if (matchedFiles.size()>1) {
- StringBuffer sb = new StringBuffer("Found multiple resources that match ");
- for (int i=1;i<matchedFiles.size();i++) {
- sb.append(matchedFiles.get(i));
- if (i!=matchedFiles.size()-1)
- sb.append(":");
- }
- log.debug(sb.toString());
- // remove extras
- for (int i=0;i<matchedFiles.size();i++) {
- matchedFiles.remove(1);
- }
+
+ if (urls.hasMoreElements()) {
+ StringBuffer sb = new StringBuffer("Found multiple resources that match ");
+ sb.append(jarName);
+ sb.append(": ");
+ sb.append(resourceLocation);
+
+ while (urls.hasMoreElements()) {
+ sb.append(urls.nextElement());
+ sb.append("; ");
}
+
+ log.debug(sb.toString());
}
- return matchedFiles;
+
+ return resourceLocation;
}
/**
@@ -537,10 +475,28 @@ public class PigServer {
* @throws IOException
*/
public void registerJar(String name) throws IOException {
+ // first try to locate jar via system resources
+ // if this fails, try by using "name" as File (this preserves
+ // compatibility with case when user passes absolute path or path
+ // relative to current working directory.)
if (name != null) {
- List<URL> resource = locateJarFromResources(name);
- for(int i=0; i< resource.size(); i++ ){
- pigContext.addJar(resource.get(i));
+ URL resource = locateJarFromResources(name);
+
+ if (resource == null) {
+ FetchFileRet[] files = FileLocalizer.fetchFiles(pigContext.getProperties(), name);
+
+ for(FetchFileRet file : files) {
+ File f = file.file;
+ if (!f.canRead()) {
+ int errCode = 4002;
+ String msg = "Can't read jar file: " + name;
+ throw new FrontendException(msg, errCode, PigException.USER_ENVIRONMENT);
+ }
+
+ pigContext.addJar(f.toURI().toURL());
+ }
+ } else {
+ pigContext.addJar(resource);
}
}
}
Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Fri Jul 8 05:55:29 2011
@@ -26,7 +26,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -37,6 +36,7 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
@@ -734,37 +734,92 @@ public class FileLocalizer {
* directory.
*/
public static FetchFileRet fetchFile(Properties properties, String filePath) throws IOException {
- // Create URI from String.
- URI fileUri = null;
- try {
- fileUri = new URI(filePath);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- // If URI is a local file, verify it exists and return.
- if (((!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))) && (fileUri.getScheme() == null))
- || "file".equalsIgnoreCase(fileUri.getScheme())
- || "local".equalsIgnoreCase(fileUri.getScheme())) {
- File res = new File(fileUri.getPath());
- if (!res.exists()) {
- throw new ExecException("Local file '" + filePath + "' does not exist.", 101, PigException.INPUT);
- }
- return new FetchFileRet(res, false);
+ return fetchFilesInternal(properties, filePath, false)[0];
+ }
+
+ /**
+ * Ensures that the passed files pointed to by path are on the local file system,
+ * fetching them to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true
+ * and dfs is not null, then a relative path is assumed to be relative to the passed
+ * dfs active directory. Else they are assumed to be relative to the local working
+ * directory.
+ */
+ public static FetchFileRet[] fetchFiles(Properties properties, String filePath) throws IOException {
+ return fetchFilesInternal(properties, filePath, true);
+ }
+
+ /**
+ * Copies the files from remote to local filesystem.
+ * When 'multipleFiles' is set the path could point to multiple files
+ * through globs or a directory. In this case, return array contains multiple
+ * files, otherwise a single file is returned.
+ *
+ * If pig.jars.relative.to.dfs is true then a relative path is assumed to be
+ * relative to the default filesystem's active directory.
+ * Else they are assumed to be relative to the local working directory.
+ *
+ * @param properties
+ * @param filePath
+ * @param multipleFiles
+ * @return
+ */
+ private static FetchFileRet[] fetchFilesInternal(Properties properties,
+ String filePath,
+ boolean multipleFiles) throws IOException {
+
+ Path path = new Path(filePath);
+ URI uri = path.toUri();
+ Configuration conf = new Configuration();
+ ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties));
+
+ // if there is no schema or if the schema is "local", then it is
+ // expected to be a local path.
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FileSystem srcFs;
+ if ( (!"true".equals(properties.getProperty("pig.jars.relative.to.dfs"))
+ && uri.getScheme() == null )||
+ uri.getScheme().equals("local") ) {
+ srcFs = localFs;
} else {
-
- Path src = new Path(fileUri.getPath());
- File parent = (localTempDir != null) ? localTempDir : new File(System.getProperty("java.io.tmpdir"));
- File dest = new File(parent, src.getName());
- dest.deleteOnExit();
- try {
- Configuration configuration = new Configuration();
- ConfigurationUtil.mergeConf(configuration, ConfigurationUtil.toConfiguration(properties));
- FileSystem srcFs = FileSystem.get(fileUri, configuration);
- srcFs.copyToLocalFile(src, new Path(dest.getAbsolutePath()));
- } catch (IOException e) {
- throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, PigException.INPUT, e);
+ srcFs = path.getFileSystem(conf);
+ }
+
+ FileStatus[] files;
+
+ if (multipleFiles) {
+ files = srcFs.globStatus(path);
+ } else {
+ files = new FileStatus[]{ srcFs.getFileStatus(path) };
+ }
+ if (files == null || files.length == 0) {
+ throw new ExecException("file '" + filePath + "' does not exist.", 101, PigException.INPUT);
+ }
+
+ FetchFileRet[] fetchFiles = new FetchFileRet[files.length];
+ int idx = 0;
+
+ for(FileStatus file : files) {
+ // should throw an exception if this is not a file?
+
+ String pathname = file.getPath().toUri().getPath();
+ String filename = file.getPath().getName();
+
+ if (srcFs == localFs) {
+ fetchFiles[idx++] = new FetchFileRet(new File(pathname), false);
+ } else {
+ // fetch from remote:
+ File dest = new File(localTempDir, filename);
+ dest.deleteOnExit();
+ try {
+ srcFs.copyToLocalFile(file.getPath(), new Path(dest.getAbsolutePath()));
+ } catch (IOException e) {
+ throw new ExecException("Could not copy " + filePath + " to local destination " + dest, 101, PigException.INPUT, e);
+ }
+ fetchFiles[idx++] = new FetchFileRet(dest, true);
}
- return new FetchFileRet(dest, true);
}
+
+ return fetchFiles;
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1144163&r1=1144162&r2=1144163&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Fri Jul 8 05:55:29 2011
@@ -42,6 +42,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pig.impl.PigContext;
@@ -119,14 +122,19 @@ public class TestPigServer {
// creates an empty jar file
private static void createFakeJarFile(String location, String name)
- throws IOException {
- Assert.assertFalse((new File(name)).canRead());
-
- System.err. println("Location: " + location);
- new File(location).mkdirs();
-
- Assert.assertTrue((new File(location + FILE_SEPARATOR + name)).
- createNewFile());
+ throws IOException {
+ createFakeJarFile(location, name,
+ FileSystem.getLocal(cluster.getConfiguration()).getRaw());
+ }
+
+ // creates an empty jar file
+ private static void createFakeJarFile(String location, String name, FileSystem fs)
+ throws IOException {
+ System.err. println("Location: " + location + " name: " + name);
+ Path dir = new Path(location);
+ fs.mkdirs(dir);
+
+ Assert.assertTrue(fs.createNewFile(new Path(dir, name)));
}
// dynamically add more resources to the system class loader
@@ -360,6 +368,37 @@ public class TestPigServer {
}
@Test
+ public void testRegisterRemoteGlobbingJar() throws Throwable {
+ String dir = "test1_register_remote_jar_globbing";
+ String jarLocation = dir + FILE_SEPARATOR;
+ String jar1Name = "TestRegisterRemoteJarGlobbing1.jar";
+ String jar2Name = "TestRegisterRemoteJarGlobbing2.jar";
+
+ FileSystem fs = cluster.getFileSystem();
+ createFakeJarFile(jarLocation, jar1Name, fs);
+ createFakeJarFile(jarLocation, jar2Name, fs);
+
+ // find the absolute path for the directory so that it does not
+ // depend on configuration
+ String absPath = fs.getFileStatus(new Path(jarLocation)).getPath().toString();
+
+ boolean exceptionRaised = false;
+ try {
+ pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ exceptionRaised = true;
+ }
+ Assert.assertFalse(exceptionRaised);
+ verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
+ verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
+
+ // clean-up
+ Assert.assertTrue(fs.delete(new Path(jarLocation), true));
+ }
+
+ @Test
public void testDescribeLoad() throws Throwable {
PrintStream console = System.out;
PrintStream out = new PrintStream(new BufferedOutputStream(new FileOutputStream(stdOutRedirectedFile)));