You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/31 21:13:33 UTC
svn commit: r469635 - in /lucene/hadoop/trunk: ./ src/contrib/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/filecache...
Author: cutting
Date: Tue Oct 31 12:13:31 2006
New Revision: 469635
URL: http://svn.apache.org/viewvc?view=rev&rev=469635
Log:
HADOOP-576. Enable contrib/streaming to use the file cache. Contributed by Mahadev.
Added:
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/contrib/build-contrib.xml
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 31 12:13:31 2006
@@ -87,6 +87,10 @@
reduce tasks which use MapFile to still report progress while
writing blocks to the filesystem. (cutting)
+24. HADOOP-576. Enable contrib/streaming to use the file cache. Also
+ extend the cache to permit symbolic links to cached items, rather
+ than local file copies. (Mahadev Konar via cutting)
+
Release 0.7.2 - 2006-10-18
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Tue Oct 31 12:13:31 2006
@@ -350,8 +350,8 @@
<target name="test-contrib" depends="compile-core, compile-core-test">
<subant target="test">
- <fileset file="${basedir}/src/contrib/build.xml"/>
- </subant>
+ <fileset file="${basedir}/src/contrib/build.xml"/>
+ </subant>
</target>
<target name="test" depends="test-core, test-contrib">
Modified: lucene/hadoop/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/build-contrib.xml?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build-contrib.xml (original)
+++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Tue Oct 31 12:13:31 2006
@@ -24,9 +24,10 @@
<property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
<property name="build.classes" location="${build.dir}/classes"/>
<property name="build.test" location="${build.dir}/test"/>
+ <property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
<!-- all jars together -->
<property name="deploy.dir" location="${hadoop.root}/build/"/>
-
+ <property name="minimr.dir" value="${hadoop.root}/build/minimr"/>
<property name="javac.deprecation" value="off"/>
<property name="javac.debug" value="on"/>
@@ -51,6 +52,7 @@
<path id="test.classpath">
<pathelement location="${build.test}" />
<pathelement location="${hadoop.root}/build/test/classes"/>
+ <pathelement location="${minimr.dir}" />
<pathelement location="${hadoop.root}/src/test"/>
<pathelement location="${conf.dir}"/>
<pathelement location="${hadoop.root}/build"/>
@@ -69,7 +71,8 @@
<mkdir dir="${build.dir}"/>
<mkdir dir="${build.classes}"/>
<mkdir dir="${build.test}"/>
-
+ <mkdir dir="${hadoop.log.dir}"/>
+ <mkdir dir="${minimr.dir}"/>
<antcall target="init-contrib"/>
</target>
@@ -131,8 +134,10 @@
<!-- ================================================================== -->
<target name="test" depends="compile-test, deploy" if="test.available">
<echo message="contrib: ${name}"/>
+ <delete dir="${hadoop.log.dir}"/>
+ <mkdir dir="${hadoop.log.dir}"/>
<junit
- printsummary="withOutAndErr" haltonfailure="no" fork="yes"
+ printsummary="withOutAndErr" showoutput="no" haltonfailure="no" fork="yes"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.build.data" value="${build.test}/data"/>
@@ -145,7 +150,7 @@
<sysproperty key="fs.default.name" value="${fs.default.name}"/>
<sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
-
+ <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
<classpath refid="test.classpath"/>
<formatter type="plain" />
<batchtest todir="${build.test}" unless="testcase">
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Tue Oct 31 12:13:31 2006
@@ -22,8 +22,12 @@
import java.net.InetAddress;
import java.util.*;
-/*
- * If we move to Java 1.5, we can get rid of this class and just use System.getenv
+/**
+ * This is a class used to get the current environment
+ * on the host machines running the map/reduce. This class
+ * assumes that setting the environment in streaming is
+ * allowed on windows/ix/linuz/freebsd/sunos/solaris/hp-ux
+ * @author michel
*/
public class Environment extends Properties {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Tue Oct 31 12:13:31 2006
@@ -21,6 +21,8 @@
import java.io.IOException;
/** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
+ * or bin/hadoop har hadoop-streaming.jar args.
+ * It passes all the args to StreamJob which handles all the arguments.
*/
public class HadoopStreaming {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java Tue Oct 31 12:13:31 2006
@@ -23,6 +23,16 @@
import java.util.jar.*;
import java.util.zip.ZipException;
+/**
+ * This class is the main class for generating job.jar
+ * for Hadoop Streaming jobs. It includes the files specified
+ * with the -file option and includes them in the jar. Also,
+ * hadoop-streaming is a user level appplication, so all the classes
+ * with hadoop-streaming that are needed in the job are also included
+ * in the job.jar.
+ * @author michel
+ *
+ */
public class JarBuilder {
public JarBuilder() {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java Tue Oct 31 12:13:31 2006
@@ -30,7 +30,9 @@
*
* Note: not specifying ownerOnly maps to ownerOnly = false
* From man chmod: If no user specs are given, the effect is as if `a' were given.
- *
+ * This class is mainly used to change permissions when files are unjarred from the
+ * job.jar. The executable specified in the mappper/reducer is set to be executable
+ * using this class.
*/
public class MustangFile extends File {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Oct 31 12:13:31 2006
@@ -39,7 +39,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
-
+import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.util.*;
/** All the client-side work happens here.
* (Jar packaging, MapRed job submission and monitoring)
* @author Michel Tourn
@@ -54,7 +55,13 @@
argv_ = argv;
mayExit_ = mayExit;
}
-
+
+ /**
+ * This is the method that actually
+ * intializes the job conf and submits the job
+ * to the jobtracker
+ * @throws IOException
+ */
public void go() throws IOException {
init();
@@ -65,7 +72,7 @@
setJobConf();
submitAndMonitorJob();
}
-
+
protected void init() {
try {
env_ = new Environment();
@@ -157,6 +164,10 @@
return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
}
+ /**
+ * This method parses the command line args
+ * to a hadoop streaming job
+ */
void parseArgv() {
if (argv_.length == 0) {
exitUsage(false);
@@ -219,7 +230,22 @@
} else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
i++;
inReaderSpec_ = s;
- } else {
+ } else if((s = optionArg(argv_, i, "-cacheArchive", false)) != null) {
+ i++;
+ if (cacheArchives == null)
+ cacheArchives = s;
+ else
+ cacheArchives = cacheArchives + "," + s;
+ } else if((s = optionArg(argv_, i, "-cacheFile", false)) != null) {
+ i++;
+ System.out.println(" the val of s is " + s);
+ if (cacheFiles == null)
+ cacheFiles = s;
+ else
+ cacheFiles = cacheFiles + "," + s;
+ System.out.println(" the val of cachefiles is " + cacheFiles);
+ }
+ else {
System.err.println("Unexpected argument: " + argv_[i]);
exitUsage(false);
}
@@ -269,6 +295,8 @@
System.out.println(" -inputreader <spec> Optional.");
System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
+ System.out.println(" -cacheFile fileNameURI");
+ System.out.println(" -cacheArchive fileNameURI");
System.out.println(" -verbose");
System.out.println();
if (!detailed) {
@@ -392,7 +420,7 @@
// $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
// where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
-System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
+ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
if (runtimeClasses == null) {
runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
@@ -435,6 +463,11 @@
return jobJarName;
}
+ /**
+ * This method sets the user jobconf variable specified
+ * by user using -jobconf key=value
+ * @param doEarlyProps
+ */
protected void setUserJobConfProps(boolean doEarlyProps) {
Iterator it = userJobConfProps_.keySet().iterator();
while (it.hasNext()) {
@@ -448,7 +481,17 @@
}
}
}
-
+
+ /**
+ * get the uris of all the files/caches
+ */
+ protected void getURIs(String lcacheArchives, String lcacheFiles) {
+ String archives[] = StringUtils.getStrings(lcacheArchives);
+ String files[] = StringUtils.getStrings(lcacheFiles);
+ fileURIs = StringUtils.stringToURI(files);
+ archiveURIs = StringUtils.stringToURI(archives);
+ }
+
protected void setJobConf() throws IOException {
msg("hadoopAliasConf_ = " + hadoopAliasConf_);
config_ = new Configuration();
@@ -548,7 +591,8 @@
jobConf_.set(k, v);
}
}
-
+
+ setUserJobConfProps(false);
// output setup is done late so we can customize for reducerNone_
//jobConf_.setOutputDir(new File(output_));
setOutputSpec();
@@ -561,20 +605,36 @@
// last, allow user to override anything
// (although typically used with properties we didn't touch)
- setUserJobConfProps(false);
jar_ = packageJobJar();
if (jar_ != null) {
jobConf_.setJar(jar_);
}
-
+
+ if ((cacheArchives != null) || (cacheFiles != null)){
+ getURIs(cacheArchives, cacheFiles);
+ boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
+ if (!b)
+ fail(LINK_URI);
+ DistributedCache.createSymlink(jobConf_);
+ }
+ // set the jobconf for the caching parameters
+ if (cacheArchives != null)
+ DistributedCache.setCacheArchives(archiveURIs, jobConf_);
+ if (cacheFiles != null)
+ DistributedCache.setCacheFiles(fileURIs, jobConf_);
+
if(verbose_) {
listJobConfProperties();
}
-
+
msg("submitting to jobconf: " + getJobTrackerHostPort());
}
+ /**
+ * Prints out the jobconf properties on stdout
+ * when verbose is specified.
+ */
protected void listJobConfProperties()
{
msg("==== JobConf properties:");
@@ -765,6 +825,10 @@
protected String comCmd_;
protected String redCmd_;
protected String cluster_;
+ protected String cacheFiles;
+ protected String cacheArchives;
+ protected URI[] fileURIs;
+ protected URI[] archiveURIs;
protected ArrayList configPath_ = new ArrayList(); // <String>
protected String hadoopAliasConf_;
protected String inReaderSpec_;
@@ -780,5 +844,6 @@
protected RunningJob running_;
protected String jobId_;
-
+ protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
+ "Please specify a different link name for all of your caching URIs";
}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=auto&rev=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Tue Oct 31 12:13:31 2006
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+/**
+ * This test case tests the symlink creation
+ * utility provided by distributed caching
+ * @author mahadev
+ *
+ */
+public class TestSymLink extends TestCase
+{
+ String INPUT_FILE = "/testing-streaming/input.txt";
+ String OUTPUT_DIR = "/testing-streaming/out";
+ String CACHE_FILE = "/testing-streaming/cache.txt";
+ String input = "check to see if we can read this none reduce";
+ String map = "xargs cat ";
+ String reduce = "cat";
+ String mapString = "testlink\n";
+ String cacheString = "This is just the cache string";
+ StreamJob job;
+
+ public TestSymLink() throws IOException
+ {
+ }
+
+ public void testSymLink()
+ {
+ try {
+ boolean mayExit = false;
+ int jobTrackerPort = 60050;
+ MiniMRCluster mr = null;
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ try{
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(8050, conf, false);
+ fileSys = dfs.getFileSystem();
+ String namenode = fileSys.getName();
+ mr = new MiniMRCluster(jobTrackerPort, 60060, 1, namenode, true, 3);
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ String strJobtracker = "mapred.job.tracker=" + "localhost:" + jobTrackerPort;
+ String strNamenode = "fs.default.name=" + namenode;
+ String argv[] = new String[] {
+ "-input", INPUT_FILE,
+ "-output", OUTPUT_DIR,
+ "-mapper", map,
+ "-reducer", reduce,
+ //"-verbose",
+ //"-jobconf", "stream.debug=set"
+ "-jobconf", strNamenode,
+ "-jobconf", strJobtracker,
+ "-cacheFile", "dfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
+ };
+
+ fileSys.delete(new Path(OUTPUT_DIR));
+ fileSys.mkdirs(new Path(OUTPUT_DIR));
+
+ DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
+ file.writeBytes(mapString);
+ file.close();
+ file = fileSys.create(new Path(CACHE_FILE));
+ file.writeBytes(cacheString);
+ file.close();
+
+ job = new StreamJob(argv, mayExit);
+ job.go();
+ String line = null;
+ Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+ for (int i = 0; i < fileList.length; i++){
+ System.out.println(fileList[i].toString());
+ BufferedReader bread =
+ new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+ line = bread.readLine();
+ System.out.println(line);
+ }
+ assertEquals(cacheString + "\t", line);
+ } finally{
+ if (fileSys != null) { fileSys.close(); }
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown();}
+ }
+
+ } catch(Exception e) {
+ failTrace(e);
+ }
+ }
+
+ void failTrace(Exception e)
+ {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ fail(sw.toString());
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreaming().testCommandLine();
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Oct 31 12:13:31 2006
@@ -302,14 +302,7 @@
*/
public String[] getStrings(String name) {
String valueString = get(name);
- if (valueString == null)
- return null;
- StringTokenizer tokenizer = new StringTokenizer (valueString,",");
- List values = new ArrayList();
- while (tokenizer.hasMoreTokens()) {
- values.add(tokenizer.nextToken());
- }
- return (String[])values.toArray(new String[values.size()]);
+ return StringUtils.getStrings(valueString);
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Oct 31 12:13:31 2006
@@ -43,7 +43,7 @@
/**
*
* @param cache the cache to be localized, this should be specified as
- * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema
+ * new URI(dfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema
* or hostname:port is provided the file is assumed to be in the filesystem
* being used in the Configuration
* @param conf The Confguration file which contains the filesystem
@@ -55,12 +55,14 @@
* @param md5 this is a mere checksum to verufy if you are using the right cache.
* You need to pass the md5 of the crc file in DFS. This is matched against the one
* calculated in this api and if it does not match, the cache is not localized.
+ * @param currentWorkDir this is the directory where you would want to create symlinks
+ * for the locally cached files/archives
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
*/
public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
- boolean isArchive, String md5) throws IOException {
+ boolean isArchive, String md5, Path currentWorkDir) throws IOException {
String cacheId = makeRelative(cache, conf);
CacheStatus lcacheStatus;
Path localizedPath;
@@ -80,7 +82,7 @@
}
}
synchronized (lcacheStatus) {
- localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5);
+ localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5, currentWorkDir);
}
// try deleting stuff if you can
long size = FileUtil.getDU(new File(baseDir.toString()));
@@ -157,15 +159,26 @@
// the methoed which actually copies the caches locally and unjars/unzips them
private static Path localizeCache(URI cache, CacheStatus cacheStatus,
- Configuration conf, boolean isArchive, String md5) throws IOException {
+ Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException {
boolean b = true;
+ boolean doSymlink = getSymlink(conf);
FileSystem dfs = getFileSystem(cache, conf);
b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
if (b) {
- if (isArchive)
+ if (isArchive) {
+ if (doSymlink)
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+ currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
+
return cacheStatus.localLoadPath;
- else
+ }
+ else {
+ if (doSymlink)
+ FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+ currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
+
return cacheFilePath(cacheStatus.localLoadPath);
+ }
} else {
// remove the old archive
// if the old archive cannot be removed since it is being used by another
@@ -179,7 +192,6 @@
localFs.delete(cacheStatus.localLoadPath);
Path parchive = new Path(cacheStatus.localLoadPath,
new Path(cacheStatus.localLoadPath.getName()));
-
localFs.mkdirs(cacheStatus.localLoadPath);
String cacheId = cache.getPath();
dfs.copyToLocalFile(new Path(cacheId), parchive);
@@ -199,14 +211,23 @@
// else will not do anyhting
// and copy the file into the dir as it is
}
+ // create a symlink if #NAME is specified as fragment in the
+ // symlink
cacheStatus.currentStatus = true;
cacheStatus.md5 = checkSum;
}
- if (isArchive)
+ if (isArchive){
+ if (doSymlink)
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(),
+ currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
return cacheStatus.localLoadPath;
- else
+ }
+ else {
+ if (doSymlink)
+ FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
+ currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
return cacheFilePath(cacheStatus.localLoadPath);
-
+ }
}
// Checks if the cache has already been localized and is fresh
@@ -451,6 +472,75 @@
String files = conf.get("mapred.cache.files");
conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+ uri.toString());
+ }
+
+ /**
+ * This method allows you to create symlinks in the current working directory
+ * of the task to all the cache files/archives
+ * @param conf the jobconf
+ */
+ public static void createSymlink(Configuration conf){
+ conf.set("mapred.create.symlink", "yes");
+ }
+
+ /**
+ * This method checks to see if symlinks are to be create for the
+ * localized cache files in the current working directory
+ * @param conf the jobconf
+ * @return true if symlinks are to be created- else return false
+ */
+ public static boolean getSymlink(Configuration conf){
+ String result = conf.get("mapred.create.symlink");
+ if ("yes".equals(result)){
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This method checks if there is a conflict in the fragment names
+ * of the uris. Also makes sure that each uri has a fragment. It
+ * is only to be called if you want to create symlinks for
+ * the various archives and files.
+ * @param uriFiles The uri array of urifiles
+ * @param uriArchives the uri array of uri archives
+ */
+ public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){
+ if ((uriFiles == null) && (uriArchives == null)){
+ return true;
+ }
+ if (uriFiles != null){
+ for (int i = 0; i < uriFiles.length; i++){
+ String frag1 = uriFiles[i].getFragment();
+ if (frag1 == null)
+ return false;
+ for (int j=i+1; j < uriFiles.length; i++){
+ String frag2 = uriFiles[j].getFragment();
+ if (frag2 == null)
+ return false;
+ if (frag1.equalsIgnoreCase(frag2))
+ return false;
+ }
+ if (uriArchives != null){
+ for (int j = 0; j < uriArchives.length; j++){
+ String frag2 = uriArchives[j].getFragment();
+ if (frag2 == null){
+ return false;
+ }
+ if (frag1.equalsIgnoreCase(frag2))
+ return false;
+ for (int k=j+1; k < uriArchives.length; k++){
+ String frag3 = uriArchives[k].getFragment();
+ if (frag3 == null)
+ return false;
+ if (frag2.equalsIgnoreCase(frag3))
+ return false;
+ }
+ }
+ }
+ }
+ }
+ return true;
}
private static class CacheStatus {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Oct 31 12:13:31 2006
@@ -301,4 +301,23 @@
zipFile.close();
}
}
+
+ /**
+ * Create a soft link between a src and destination
+ * only on a local disk. HDFS does not support this
+ * @param target the target for symlink
+ * @param destination the symlink
+ * @return value returned by the command
+ */
+ public static int symLink(String target, String linkname) throws IOException{
+ String cmd = "ln -s " + target + " " + linkname;
+ Process p = Runtime.getRuntime().exec( cmd, null );
+ int returnVal = -1;
+ try{
+ returnVal = p.waitFor();
+ } catch(InterruptedException e){
+ //do nothing as of yet
+ }
+ return returnVal;
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 31 12:13:31 2006
@@ -80,7 +80,7 @@
//before preparing the job localize
//all the archives
-
+ File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
if ((archives != null) || (files != null)) {
@@ -88,7 +88,8 @@
String[] md5 = DistributedCache.getArchiveMd5(conf);
Path[] p = new Path[archives.length];
for (int i = 0; i < archives.length;i++){
- p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]);
+ p[i] = DistributedCache.getLocalCache(archives[i], conf,
+ conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i], new Path(workDir.getAbsolutePath()));
}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
}
@@ -97,7 +98,7 @@
Path[] p = new Path[files.length];
for (int i = 0; i < files.length;i++){
p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
- .getCacheSubdir()), false, md5[i]);
+ .getCacheSubdir()), false, md5[i], new Path(workDir.getAbsolutePath()));
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
@@ -123,7 +124,6 @@
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
- File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
workDir.mkdirs();
String jar = conf.getJar();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Tue Oct 31 12:13:31 2006
@@ -24,7 +24,10 @@
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.DecimalFormat;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
+import java.util.StringTokenizer;
import org.apache.hadoop.fs.*;
@@ -253,4 +256,19 @@
return buf.toString();
}
-}
+ /**
+ * returns an arraylist of strings
+ * @param str the comma seperated string values
+ * @return the arraylist of the comma seperated string values
+ */
+ public static String[] getStrings(String str){
+ if (str == null)
+ return null;
+ StringTokenizer tokenizer = new StringTokenizer (str,",");
+ List values = new ArrayList();
+ while (tokenizer.hasMoreTokens()) {
+ values.add(tokenizer.nextToken());
+ }
+ return (String[])values.toArray(new String[values.size()]);
+ }
+}
\ No newline at end of file