You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/18 14:02:00 UTC
svn commit: r423058 -
/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
Author: cutting
Date: Tue Jul 18 05:01:59 2006
New Revision: 423058
URL: http://svn.apache.org/viewvc?rev=423058&view=rev
Log:
Further improvements to HADOOP-341.
Modified:
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=423058&r1=423057&r2=423058&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Jul 18 05:01:59 2006
@@ -21,6 +21,7 @@
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
+import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
@@ -64,7 +65,7 @@
private static final String usage = "distcp "+
"[-fs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
"[-conf <config-file.xml>] " + "[-D <property=value>] "+
- "[-i] <srcurl> <desturl>";
+ "[-i] <srcurl> | -f <urilist_uri> <desturl>";
private static final long MIN_BYTES_PER_MAP = 1L << 28;
private static final int MAX_NUM_MAPS = 10000;
@@ -111,7 +112,7 @@
public abstract void cleanup(Configuration conf, JobConf jobConf,
String srcPath, String destPath) throws IOException;
- public String getFileSysName(URI url) {
+ public static String getFileSysName(URI url) {
String fsname = url.getScheme();
if ("dfs".equals(fsname)) {
String host = url.getHost();
@@ -127,7 +128,7 @@
* absPath is always assumed to descend from root.
* Otherwise returned path is null.
*/
- public Path makeRelative(Path root, Path absPath) {
+ public static Path makeRelative(Path root, Path absPath) {
if (!absPath.isAbsolute()) { return absPath; }
String sRoot = root.toString();
String sPath = absPath.toString();
@@ -612,6 +613,50 @@
}
}
+ private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException
+ {
+ ArrayList uris = new ArrayList();
+ BufferedReader fis = null;
+
+ String srcListURIScheme = srcListURI.getScheme();
+ String srcListURIPath = srcListURI.getPath();
+
+ if("file".equals(srcListURIScheme)) {
+ fis = new BufferedReader(new FileReader(srcListURIPath));
+ } else if("dfs".equals(srcListURIScheme)) {
+ FileSystem fs = FileSystem.getNamed(CopyFilesMapper.getFileSysName(srcListURI), conf);
+ fis = new BufferedReader(
+ new InputStreamReader(new FSDataInputStream(fs, new Path(srcListURIPath), conf))
+ );
+ } else if("http".equals(srcListURIScheme)) {
+ //Copy the file
+ URL url = srcListURI.toURL();
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.connect();
+
+ fis = new BufferedReader(
+ new InputStreamReader(connection.getInputStream())
+ );
+ } else {
+ throw new IOException("Unsupported source list uri!");
+ }
+
+ String uri = null;
+ while((uri = fis.readLine()) != null) {
+ if(!uri.startsWith("#")) {
+ uris.add(uri);
+ }
+ }
+ fis.close();
+
+ if(!uris.isEmpty()) {
+ return (String[])uris.toArray(new String[0]);
+ }
+
+ return null;
+ }
+
/**
* Helper function to parse input file and return source urls for
* a given protocol.
@@ -619,21 +664,19 @@
* @param inputFilePath : The file containing the urls.
* @return
*/
- private static String[] parseInputFile(String protocol, String inputFilePath)
+ private static String[] parseInputFile(String protocol, String[] uris)
throws IOException
{
- ArrayList urls = new ArrayList();
- String url;
- BufferedReader fis = new BufferedReader(new FileReader(inputFilePath));
- while((url = fis.readLine()) != null) {
- if(!url.startsWith("#") && url.startsWith(protocol)) {
- urls.add(url);
+ ArrayList protocolURIs = new ArrayList();
+
+ for(int i=0; i < uris.length; ++i) {
+ if(uris[i].startsWith(protocol)) {
+ protocolURIs.add(uris[i]);
}
}
- fis.close();
- if(!urls.isEmpty()) {
- return (String[])urls.toArray(new String[0]);
+ if(!protocolURIs.isEmpty()) {
+ return (String[])protocolURIs.toArray(new String[0]);
}
return null;
@@ -653,27 +696,35 @@
JobConf jobConf = new JobConf(conf, CopyFiles.class);
jobConf.setJobName("distcp");
+ //Sanity check for srcPath/destPath
URI srcURI = null;
- //URI destURI = null;
try {
- if(!srcAsList) {
srcURI = new URI(srcPath);
- }
- //destURI = new URI(destPath);
} catch (URISyntaxException ex) {
throw new IOException("Illegal source path!");
}
+ URI destURI = null;
+ try {
+ destURI = new URI(destPath);
+ } catch (URISyntaxException ex) {
+ throw new IOException("Illegal destination path!");
+ }
+
//Source paths
String[] srcPaths = null;
+ if(srcAsList) {
+ srcPaths = fetchSrcURIs(conf, srcURI);
+ }
+
//Create the task-specific mapper
CopyFilesMapper mapper = null;
if(srcAsList) {
//Ugly?!
// Protocol - 'dfs://'
- String[] dfsUrls = parseInputFile("dfs", srcPath);
+ String[] dfsUrls = parseInputFile("dfs", srcPaths);
if(dfsUrls != null) {
for(int i=0; i < dfsUrls.length; ++i) {
copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures);
@@ -681,7 +732,7 @@
}
// Protocol - 'file://'
- String[] localUrls = parseInputFile("file", srcPath);
+ String[] localUrls = parseInputFile("file", srcPaths);
if(localUrls != null) {
for(int i=0; i < localUrls.length; ++i) {
copy(conf, localUrls[i], destPath, false, ignoreReadFailures);
@@ -689,7 +740,7 @@
}
// Protocol - 'http://'
- String[] httpUrls = parseInputFile("http", srcPath);
+ String[] httpUrls = parseInputFile("http", srcPaths);
if(httpUrls != null) {
srcPaths = httpUrls;
mapper = CopyMapperFactory.getMapper("http");