You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/15 00:13:44 UTC
svn commit: r443497 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/
src/test/org/apache/hadoop/mapred/
Author: cutting
Date: Thu Sep 14 15:13:43 2006
New Revision: 443497
URL: http://svn.apache.org/viewvc?view=rev&rev=443497
Log:
HADOOP-288. Add a file caching system and use it in MapReduce to cache job jar files on slave nodes. Contributed by Mahadev.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar (with props)
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip (with props)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 14 15:13:43 2006
@@ -14,6 +14,9 @@
3. HADOOP-530. Improve error messages in SequenceFile when keys or
values are of the wrong type. (Hairong Kuang via cutting)
+4. HADOOP-288. Add a file caching system and use it in MapReduce to
+ cache job jar files on slave nodes. (Mahadev Konar via cutting)
+
Release 0.6.1 - 2006-08-13
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Thu Sep 14 15:13:43 2006
@@ -36,6 +36,7 @@
<property name="test.src.dir" value="${basedir}/src/test"/>
<property name="test.build.dir" value="${build.dir}/test"/>
<property name="test.build.data" value="${test.build.dir}/data"/>
+ <property name="test.cache.data" value="${test.build.dir}/cache"/>
<property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
<property name="test.build.classes" value="${test.build.dir}/classes"/>
<property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
@@ -277,6 +278,11 @@
value="org/apache/hadoop/test/AllTestDriver"/>
</manifest>
</jar>
+ <delete dir="${test.cache.data}"/>
+ <mkdir dir="${test.cache.data}"/>
+ <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
+ <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
+ <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>
</target>
<!-- ================================================================== -->
@@ -288,7 +294,6 @@
<mkdir dir="${test.build.data}"/>
<delete dir="${hadoop.log.dir}"/>
<mkdir dir="${hadoop.log.dir}"/>
-
<junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.build.data" value="${test.build.data}"/>
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Sep 14 15:13:43 2006
@@ -271,6 +271,14 @@
</property>
<property>
+ <name>local.cache.size</name>
+ <value>10737418240</value>
+ <description>The limit on the size of cache you want to keep, set by default
+ to 10GB. This will act as a soft limit on the cache directory for out of band data.
+ </description>
+</property>
+
+<property>
<name>mapred.system.dir</name>
<value>${hadoop.tmp.dir}/mapred/system</value>
<description>The shared directory where MapReduce stores control files.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Thu Sep 14 15:13:43 2006
@@ -0,0 +1,466 @@
+/* Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import org.apache.commons.logging.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.net.URI;
+
+/*******************************************************************************
+ * The DistributedCache maintains all the caching information of cached archives
+ * and unarchives all the files as well and returns the path
+ *
+ * @author Mahadev Konar
+ ******************************************************************************/
+public class DistributedCache {
+ // cacheID to cacheStatus mapping
+ private static TreeMap cachedArchives = new TreeMap();
+ // buffer size for reading checksum files
+ private static final int CRC_BUFFER_SIZE = 64 * 1024;
+
+ /**
+ *
+ * @param cache the cache to be localized, this should be specified as
+ * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema
+ * or hostname:port is provided the file is assumed to be in the filesystem
+ * being used in the Configuration
+ * @param conf The Confguration file which contains the filesystem
+ * @param baseDir The base cache Dir where you wnat to localize the files/archives
+ * @param isArchive if the cache is an archive or a file. In case it is an archive
+ * with a .zip or .jar extension it will be unzipped/unjarred automatically
+ * and the directory where the archive is unjarred is returned as the Path.
+ * In case of a file, the path to the file is returned
+ * @param md5 this is a mere checksum to verufy if you are using the right cache.
+ * You need to pass the md5 of the crc file in DFS. This is matched against the one
+ * calculated in this api and if it does not match, the cache is not localized.
+ * @return the path to directory where the archives are unjarred in case of archives,
+ * the path to the file where the file is copied locally
+ * @throws IOException
+ */
+ public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
+ boolean isArchive, String md5) throws IOException {
+ String cacheId = makeRelative(cache, conf);
+ CacheStatus lcacheStatus;
+ Path localizedPath;
+ synchronized (cachedArchives) {
+ if (!cachedArchives.containsKey(cacheId)) {
+ // was never localized
+ lcacheStatus = new CacheStatus();
+ lcacheStatus.currentStatus = false;
+ lcacheStatus.refcount = 1;
+ lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId));
+ cachedArchives.put(cacheId, lcacheStatus);
+ } else {
+ lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+ synchronized (lcacheStatus) {
+ lcacheStatus.refcount++;
+ }
+ }
+ }
+ synchronized (lcacheStatus) {
+ localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5);
+ }
+ // try deleting stuff if you can
+ long size = FileUtil.getDU(new File(baseDir.toString()));
+ // setting the cache size to a default of 1MB
+ long allowedSize = conf.getLong("local.cache.size", 1048576L);
+ if (allowedSize < size) {
+ // try some cache deletions
+ deleteCache(conf);
+ }
+ return localizedPath;
+ }
+
+ /**
+ * This is the opposite of getlocalcache. When you are done with
+ * using the cache, you need to release the cache
+ * @param cache The cache URI to be released
+ * @param conf configuration which contains the filesystem the cache
+ * is contained in.
+ * @throws IOException
+ */
+ public static void releaseCache(URI cache, Configuration conf)
+ throws IOException {
+ String cacheId = makeRelative(cache, conf);
+ synchronized (cachedArchives) {
+ CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+ synchronized (lcacheStatus) {
+ lcacheStatus.refcount--;
+ }
+ }
+ }
+
+ // To delete the caches which have a refcount of zero
+
+ private static void deleteCache(Configuration conf) throws IOException {
+ // try deleting cache Status with refcount of zero
+ synchronized (cachedArchives) {
+ for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
+ String cacheId = (String) it.next();
+ CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+ if (lcacheStatus.refcount == 0) {
+ // delete this cache entry
+ FileSystem.getNamed("local", conf).delete(lcacheStatus.localLoadPath);
+ cachedArchives.remove(cacheId);
+ }
+ }
+ }
+ }
+
+ /*
+ * Returns the relative path of the dir this cache will be localized in
+ * relative path that this cache will be localized in. For
+ * dfs://hostname:port/absolute_path -- the relative path is
+ * hostname/absolute path -- if it is just /absolute_path -- then the
+ * relative path is hostname of DFS this mapred cluster is running
+ * on/absolute_path
+ */
+ private static String makeRelative(URI cache, Configuration conf)
+ throws IOException {
+ String fsname = cache.getScheme();
+ String path;
+ FileSystem dfs = FileSystem.get(conf);
+ if ("dfs".equals(fsname)) {
+ path = cache.getHost() + cache.getPath();
+ } else {
+ String[] split = dfs.getName().split(":");
+ path = split[0] + cache.getPath();
+ }
+ return path;
+ }
+
+ private static Path cacheFilePath(Path p) {
+ return new Path(p, p.getName());
+ }
+
+ // the methoed which actually copies the caches locally and unjars/unzips them
+ private static Path localizeCache(URI cache, CacheStatus cacheStatus,
+ Configuration conf, boolean isArchive, String md5) throws IOException {
+ boolean b = true;
+ FileSystem dfs = getFileSystem(cache, conf);
+ b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
+ if (b) {
+ if (isArchive)
+ return cacheStatus.localLoadPath;
+ else
+ return cacheFilePath(cacheStatus.localLoadPath);
+ } else {
+ // remove the old archive
+ // if the old archive cannot be removed since it is being used by another
+ // job
+ // return null
+ if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+ throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+ + " is in use and cannot be refreshed");
+ byte[] checkSum = createMD5(cache, conf);
+ FileSystem localFs = FileSystem.getNamed("local", conf);
+ localFs.delete(cacheStatus.localLoadPath);
+ Path parchive = new Path(cacheStatus.localLoadPath,
+ new Path(cacheStatus.localLoadPath.getName()));
+
+ localFs.mkdirs(cacheStatus.localLoadPath);
+ String cacheId = cache.getPath();
+ dfs.copyToLocalFile(new Path(cacheId), parchive);
+ dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive
+ .toString()
+ + "_md5"));
+ if (isArchive) {
+ String tmpArchive = parchive.toString().toLowerCase();
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(new File(parchive.toString()), new File(parchive
+ .getParent().toString()));
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(new File(parchive.toString()), new File(parchive
+ .getParent().toString()));
+
+ }
+ // else will not do anyhting
+ // and copy the file into the dir as it is
+ }
+ cacheStatus.currentStatus = true;
+ cacheStatus.md5 = checkSum;
+ }
+ if (isArchive)
+ return cacheStatus.localLoadPath;
+ else
+ return cacheFilePath(cacheStatus.localLoadPath);
+
+ }
+
+ // Checks if the cache has already been localized and is fresh
+ private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache,
+ FileSystem dfs, String confMD5, Configuration conf) throws IOException {
+ // compute the md5 of the crc
+ byte[] digest = null;
+ byte[] fsDigest = createMD5(cache, conf);
+ byte[] confDigest = StringUtils.hexStringToByte(confMD5);
+ // check for existence of the cache
+ if (lcacheStatus.currentStatus == false) {
+ return false;
+ } else {
+ digest = lcacheStatus.md5;
+ if (!MessageDigest.isEqual(confDigest, fsDigest)) {
+ throw new IOException("Inconsistencty in data caching, "
+ + "Cache archives have been changed");
+ } else {
+ if (!MessageDigest.isEqual(confDigest, digest)) {
+ // needs refreshing
+ return false;
+ } else {
+ // does not need any refreshing
+ return true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns md5 of the checksum file for a given dfs file.
+ * This method also creates file filename_md5 existence of which
+ * signifies a new cache has been loaded into dfs. So if you want to
+ * refresh the cache, you need to delete this md5 file as well.
+ * @param cache The cache to get the md5 checksum for
+ * @param conf configuration
+ * @return md5 of the crc of the cache parameter
+ * @throws IOException
+ */
+ public static byte[] createMD5(URI cache, Configuration conf)
+ throws IOException {
+ byte[] b = new byte[CRC_BUFFER_SIZE];
+ byte[] digest = null;
+
+ FileSystem fileSystem = getFileSystem(cache, conf);
+ String filename = cache.getPath();
+ Path filePath = new Path(filename);
+ Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR
+ + filePath.getName() + "_md5");
+ MessageDigest md5 = null;
+ try {
+ md5 = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException na) {
+ // do nothing
+ }
+ if (!fileSystem.exists(md5File)) {
+ FSInputStream fsStream = fileSystem.openRaw(FileSystem
+ .getChecksumFile(filePath));
+ int read = fsStream.read(b);
+ while (read != -1) {
+ md5.update(b, 0, read);
+ read = fsStream.read(b);
+ }
+ fsStream.close();
+ digest = md5.digest();
+
+ FSDataOutputStream out = fileSystem.create(md5File);
+ out.write(digest);
+ out.close();
+ } else {
+ FSInputStream fsStream = fileSystem.openRaw(md5File);
+ digest = new byte[md5.getDigestLength()];
+ // assuming reading 16 bytes once is not a problem
+ // though it should be checked if 16 bytes have been read or not
+ int read = fsStream.read(digest);
+ fsStream.close();
+ }
+
+ return digest;
+ }
+
+ private static String getFileSysName(URI url) {
+ String fsname = url.getScheme();
+ if ("dfs".equals(fsname)) {
+ String host = url.getHost();
+ int port = url.getPort();
+ return (port == (-1)) ? host : (host + ":" + port);
+ } else {
+ return null;
+ }
+ }
+
+ private static FileSystem getFileSystem(URI cache, Configuration conf)
+ throws IOException {
+ String fileSysName = getFileSysName(cache);
+ if (fileSysName != null)
+ return FileSystem.getNamed(fileSysName, conf);
+ else
+ return FileSystem.get(conf);
+ }
+
+ /**
+ * Set the configuration with the given set of archives
+ * @param archives The list of archives that need to be localized
+ * @param conf Configuration which will be changed
+ */
+ public static void setCacheArchives(URI[] archives, Configuration conf) {
+ String sarchives = StringUtils.uriToString(archives);
+ conf.set("mapred.cache.archives", sarchives);
+ }
+
+ /**
+ * Set the configuration with the given set of files
+ * @param files The list of files that need to be localized
+ * @param conf Configuration which will be changed
+ */
+ public static void setCacheFiles(URI[] files, Configuration conf) {
+ String sfiles = StringUtils.uriToString(files);
+ conf.set("mapred.cache.files", sfiles);
+ }
+
+ /**
+ * Get cache archives set in the Configuration
+ * @param conf The configuration which contains the archives
+ * @return A URI array of the caches set in the Configuration
+ * @throws IOException
+ */
+ public static URI[] getCacheArchives(Configuration conf) throws IOException {
+ return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
+ }
+
+ /**
+ * Get cache files set in the Configuration
+ * @param conf The configuration which contains the files
+ * @return A URI array of the files set in the Configuration
+ * @throws IOException
+ */
+
+ public static URI[] getCacheFiles(Configuration conf) throws IOException {
+ return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
+ }
+
+ /**
+ * Return the path array of the localized caches
+ * @param conf Configuration that contains the localized archives
+ * @return A path array of localized caches
+ * @throws IOException
+ */
+ public static Path[] getLocalCacheArchives(Configuration conf)
+ throws IOException {
+ return StringUtils.stringToPath(conf
+ .getStrings("mapred.cache.localArchives"));
+ }
+
+ /**
+ * Return the path array of the localized files
+ * @param conf Configuration that contains the localized files
+ * @return A path array of localized files
+ * @throws IOException
+ */
+ public static Path[] getLocalCacheFiles(Configuration conf)
+ throws IOException {
+ return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
+ }
+
+ /**
+ * Get the md5 checksums of the archives
+ * @param conf The configuration which stored the md5's
+ * @return a string array of md5 checksums
+ * @throws IOException
+ */
+ public static String[] getArchiveMd5(Configuration conf) throws IOException {
+ return conf.getStrings("mapred.cache.archivemd5");
+ }
+
+
+ /**
+ * Get the md5 checksums of the files
+ * @param conf The configuration which stored the md5's
+ * @return a string array of md5 checksums
+ * @throws IOException
+ */
+ public static String[] getFileMd5(Configuration conf) throws IOException {
+ return conf.getStrings("mapred.cache.filemd5");
+ }
+
+ /**
+ * This is to check the md5 of the archives to be localized
+ * @param conf Configuration which stores the md5's
+ * @param md5 comma seperated list of md5 checksums of the .crc's of archives.
+ * The order should be the same as the order in which the archives are added
+ */
+ public static void setArchiveMd5(Configuration conf, String md5) {
+ conf.set("mapred.cache.archivemd5", md5);
+ }
+
+ /**
+ * This is to check the md5 of the files to be localized
+ * @param conf Configuration which stores the md5's
+ * @param md5 comma seperated list of md5 checksums of the .crc's of files.
+ * The order should be the same as the order in which the files are added
+ */
+ public static void setFileMd5(Configuration conf, String md5) {
+ conf.set("mapred.cache.filemd5", md5);
+ }
+
+ /**
+ * Set the conf to contain the location for localized archives
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma seperated list of local archives
+ */
+ public static void setLocalArchives(Configuration conf, String str) {
+ conf.set("mapred.cache.localArchives", str);
+ }
+
+ /**
+ * Set the conf to contain the location for localized files
+ * @param conf The conf to modify to contain the localized caches
+ * @param str a comma seperated list of local files
+ */
+ public static void setLocalFiles(Configuration conf, String str) {
+ conf.set("mapred.cache.localFiles", str);
+ }
+
+ /**
+ * Add a archives to be localized to the conf
+ * @param uri The uri of the cache to be localized
+ * @param conf Configuration to add the cache to
+ */
+ public static void addCacheArchive(URI uri, Configuration conf) {
+ String archives = conf.get("mapred.cache.archives");
+ conf.set("mapred.cache.archives", archives == null ? uri.toString()
+ : archives + "," + uri.toString());
+ }
+
+ /**
+ * Add a file to be localized to the conf
+ * @param uri The uri of the cache to be localized
+ * @param conf Configuration to add the cache to
+ */
+ public static void addCacheFile(URI uri, Configuration conf) {
+ String files = conf.get("mapred.cache.files");
+ conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+ + uri.toString());
+ }
+
+ private static class CacheStatus {
+ // false, not loaded yet, true is loaded
+ public boolean currentStatus;
+
+ // the local load path of this cache
+ public Path localLoadPath;
+
+ // number of instances using this cache
+ public int refcount;
+
+ // The md5 checksum of the crc file of this cache
+ public byte[] md5;
+ }
+
+}
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Thu Sep 14 15:13:43 2006
@@ -17,6 +17,9 @@
package org.apache.hadoop.fs;
import java.io.*;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
import org.apache.hadoop.conf.Configuration;
@@ -230,5 +233,68 @@
}
return dst;
}
-
+
+ /**
+ * Takes an input dir and returns the du on that local directory. Very basic
+ * implementation.
+ *
+ * @param dir
+ * The input dir to get the disk space of this local dir
+ * @return The total disk space of the input local directory
+ */
+ public static long getDU(File dir) {
+ long size = 0;
+ if (!dir.exists())
+ return 0;
+ if (!dir.isDirectory()) {
+ return dir.length();
+ } else {
+ size = dir.length();
+ File[] allFiles = dir.listFiles();
+ for (int i = 0; i < allFiles.length; i++) {
+ size = size + getDU(allFiles[i]);
+ }
+ return size;
+ }
+ }
+
+ /**
+ * Given a File input it will unzip the file in a the unzip directory
+ * passed as the second parameter
+ * @param inFile The zip file as input
+ * @param unzipDir The unzip directory where to unzip the zip file.
+ * @throws IOException
+ */
+ public static void unZip(File inFile, File unzipDir) throws IOException {
+ Enumeration entries;
+ ZipFile zipFile = new ZipFile(inFile);
+ ;
+ try {
+ entries = zipFile.entries();
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = (ZipEntry) entries.nextElement();
+ if (!entry.isDirectory()) {
+ InputStream in = zipFile.getInputStream(entry);
+ try {
+ File file = new File(unzipDir, entry.getName());
+ file.getParentFile().mkdirs();
+ OutputStream out = new FileOutputStream(file);
+ try {
+ byte[] buffer = new byte[8192];
+ int i;
+ while ((i = in.read(buffer)) != -1) {
+ out.write(buffer, 0, i);
+ }
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+ } finally {
+ zipFile.close();
+ }
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu Sep 14 15:13:43 2006
@@ -21,7 +21,7 @@
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
-
+import org.apache.hadoop.filecache.*;
import java.io.*;
import java.net.*;
import java.util.*;
@@ -227,7 +227,8 @@
JobConf job = new JobConf(jobFile);
return submitJob(job);
}
-
+
+
/**
* Submit a job to the MR system
*/
@@ -244,11 +245,39 @@
Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
Path submitJobFile = new Path(submitJobDir, "job.xml");
Path submitJarFile = new Path(submitJobDir, "job.jar");
-
- String originalJarPath = job.getJar();
-
FileSystem fs = getFs();
-
+ // try getting the md5 of the archives
+ URI[] tarchives = DistributedCache.getCacheArchives(job);
+ URI[] tfiles = DistributedCache.getCacheFiles(job);
+ if ((tarchives != null) || (tfiles != null)) {
+ // prepare these archives for md5 checksums
+ if (tarchives != null) {
+ String md5Archives = StringUtils.byteToHexString(DistributedCache
+ .createMD5(tarchives[0], job));
+ for (int i = 1; i < tarchives.length; i++) {
+ md5Archives = md5Archives
+ + ","
+ + StringUtils.byteToHexString(DistributedCache
+ .createMD5(tarchives[i], job));
+ }
+ DistributedCache.setArchiveMd5(job, md5Archives);
+ //job.set("mapred.cache.archivemd5", md5Archives);
+ }
+ if (tfiles != null) {
+ String md5Files = StringUtils.byteToHexString(DistributedCache
+ .createMD5(tfiles[0], job));
+ for (int i = 1; i < tfiles.length; i++) {
+ md5Files = md5Files
+ + ","
+ + StringUtils.byteToHexString(DistributedCache
+ .createMD5(tfiles[i], job));
+ }
+ DistributedCache.setFileMd5(job, md5Files);
+ //"mapred.cache.filemd5", md5Files);
+ }
+ }
+
+ String originalJarPath = job.getJar();
short replication = (short)job.getInt("mapred.submit.replication", 10);
if (originalJarPath != null) { // copy jar to JobTracker's fs
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 14 15:13:43 2006
@@ -171,6 +171,7 @@
String dirs = get("mapred.input.dir");
set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
}
+
public Path[] getInputPaths() {
String dirs = get("mapred.input.dir", "");
ArrayList list = Collections.list(new StringTokenizer(dirs, ","));
@@ -197,8 +198,10 @@
set("user.name", user);
}
+
+
/**
- * Set whether the framework shoul keep the intermediate files for
+ * Set whether the framework should keep the intermediate files for
* failed tasks.
*/
public void setKeepFailedTaskFiles(boolean keep) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Sep 14 15:13:43 2006
@@ -20,11 +20,12 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.*;
-
+import org.apache.hadoop.filecache.*;
import java.io.*;
import java.util.jar.*;
import java.util.Vector;
import java.util.Enumeration;
+import java.net.URI;
/** Base class that runs a task in a separate process. Tasks are run in a
* separate process in order to isolate the map/reduce system code from bugs in
@@ -61,25 +62,71 @@
*/
public void close() throws IOException {}
+ private String stringifyPathArray(Path[] p){
+ if (p == null){
+ return null;
+ }
+ String str = p[0].toString();
+ for (int i = 1; i < p.length; i++){
+ str = str + "," + p[i].toString();
+ }
+ return str;
+ }
+
public final void run() {
try {
-
+
+ //before preparing the job localize
+ //all the archives
+
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ if ((archives != null) || (files != null)) {
+ if (archives != null) {
+ String[] md5 = DistributedCache.getArchiveMd5(conf);
+ Path[] p = new Path[archives.length];
+ for (int i = 0; i < archives.length;i++){
+ p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]);
+ }
+ DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
+ }
+ if ((files != null)) {
+ String[] md5 = DistributedCache.getFileMd5(conf);
+ Path[] p = new Path[files.length];
+ for (int i = 0; i < files.length;i++){
+ p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
+ .getCacheSubdir()), false, md5[i]);
+ }
+ DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
+ }
+
+ // sets the paths to local archives and paths
+ Path localTaskFile = new Path(t.getJobFile());
+ FileSystem localFs = FileSystem.getNamed("local", conf);
+ localFs.delete(localTaskFile);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ conf.write(out);
+ } finally {
+ out.close();
+ }
+ }
+
if (! prepare()) {
return;
}
String sep = System.getProperty("path.separator");
- File workDir = new File(new File(t.getJobFile()).getParent(), "work");
- workDir.mkdirs();
-
StringBuffer classPath = new StringBuffer();
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
-
+ File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+ workDir.mkdirs();
+
String jar = conf.getJar();
- if (jar != null) { // if jar exists, it into workDir
- RunJar.unJar(new File(jar), workDir);
+ if (jar != null) {
+ // if jar exists, it into workDir
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
@@ -160,10 +207,27 @@
LOG.warn(t.getTaskId()+" Reporting Diagnostics", e);
}
} finally {
+ try{
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ if (archives != null){
+ for (int i = 0; i < archives.length; i++){
+ DistributedCache.releaseCache(archives[i], conf);
+ }
+ }
+ if (files != null){
+ for(int i = 0; i < files.length; i++){
+ DistributedCache.releaseCache(files[i], conf);
+ }
+ }
+ }catch(IOException ie){
+ LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
+ }
tracker.reportTaskFinished(t.getTaskId());
}
}
+
/**
* Handle deprecated mapred.child.heap.size.
* If present, interpolate into mapred.child.java.opts value with
@@ -238,6 +302,7 @@
logStream(process.getInputStream()); // normally empty
int exit_code = process.waitFor();
+
if (!killed && exit_code != 0) {
throw new IOException("Task process exit with nonzero status of " +
exit_code + ".");
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Sep 14 15:13:43 2006
@@ -71,6 +71,7 @@
* Map from taskId -> TaskInProgress.
*/
TreeMap runningTasks = null;
+ Map runningJobs = null;
int mapTotal = 0;
int reduceTotal = 0;
boolean justStarted = true;
@@ -89,11 +90,11 @@
static Random r = new Random();
FileSystem fs = null;
- static final String SUBDIR = "taskTracker";
-
+ private static final String SUBDIR = "taskTracker";
+ private static final String CACHEDIR = "archive";
+ private static final String JOBCACHE = "jobcache";
private JobConf fConf;
private MapOutputFile mapOutputFile;
-
private int maxCurrentTasks;
private int failures;
private int finishedCount[] = new int[1];
@@ -145,6 +146,14 @@
taskCleanupThread.start();
}
+ static String getCacheSubdir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+ }
+
+ static String getJobCacheSubdir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+ }
+
public long getProtocolVersion(String protocol, long clientVersion) {
return TaskUmbilicalProtocol.versionID;
}
@@ -167,6 +176,7 @@
// Clear out state tables
this.tasks = new TreeMap();
this.runningTasks = new TreeMap();
+ this.runningJobs = new TreeMap();
this.mapTotal = 0;
this.reduceTotal = 0;
this.acceptNewTasks = true;
@@ -207,8 +217,84 @@
this.running = true;
}
+
+ // intialize the job directory
+ private void localizeJob(TaskInProgress tip) throws IOException {
+ Path localJarFile = null;
+ Task t = tip.getTask();
+ Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
+ .getJobId()
+ + Path.SEPARATOR + "job.xml"));
+ RunningJob rjob = null;
+ synchronized (runningJobs) {
+ if (!runningJobs.containsKey(t.getJobId())) {
+ rjob = new RunningJob();
+ rjob.localized = false;
+ rjob.tasks = new ArrayList();
+ rjob.jobFile = localJobFile;
+ rjob.tasks.add(tip);
+ runningJobs.put(t.getJobId(), rjob);
+ } else {
+ rjob = (RunningJob) runningJobs.get(t.getJobId());
+ // keep this for later use when we just get a jobid to delete
+ // the data for
+ rjob.tasks.add(tip);
+ }
+ }
+ synchronized (rjob) {
+ if (!rjob.localized) {
+ localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
+ .getJobId())
+ + Path.SEPARATOR + "job.jar");
+
+ String jobFile = t.getJobFile();
+ fs.copyToLocalFile(new Path(jobFile), localJobFile);
+ JobConf localJobConf = new JobConf(localJobFile);
+ String jarFile = localJobConf.getJar();
+ if (jarFile != null) {
+ fs.copyToLocalFile(new Path(jarFile), localJarFile);
+ localJobConf.setJar(localJarFile.toString());
+ FileSystem localFs = FileSystem.getNamed("local", fConf);
+ OutputStream out = localFs.create(localJobFile);
+ try {
+ localJobConf.write(out);
+ } finally {
+ out.close();
+ }
- public synchronized void shutdown() throws IOException {
+ // also unjar the job.jar files in workdir
+ File workDir = new File(
+ new File(localJobFile.toString()).getParent(),
+ "work");
+ workDir.mkdirs();
+ RunJar.unJar(new File(localJarFile.toString()), workDir);
+ }
+ rjob.localized = true;
+ }
+ }
+ launchTaskForJob(tip, new JobConf(rjob.jobFile));
+ }
+
+ private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
+ synchronized (tip) {
+ try {
+ tip.setJobConf(jobConf);
+ tip.launchTask();
+ } catch (Throwable ie) {
+ tip.runstate = TaskStatus.FAILED;
+ try {
+ tip.cleanup();
+ } catch (Throwable ie2) {
+ // Ignore it, we are just trying to cleanup.
+ }
+ String error = StringUtils.stringifyException(ie);
+ tip.reportDiagnosticInfo(error);
+ LOG.info(error);
+ }
+ }
+ }
+
+ public synchronized void shutdown() throws IOException {
shuttingDown = true;
close();
if (this.server != null) {
@@ -312,6 +398,12 @@
} catch (InterruptedException ie) {}
}
}
+ /**Return the DFS filesystem
+ * @return
+ */
+ public FileSystem getFileSystem(){
+ return fs;
+ }
/**
* Main service loop. Will stay in this loop forever.
@@ -448,6 +540,10 @@
synchronized (this) {
for (int i = 0; i < toCloseIds.length; i++) {
Object tip = tasks.get(toCloseIds[i]);
+ synchronized(runningJobs){
+ runningJobs.remove(((TaskInProgress)
+ tasks.get(toCloseIds[i])).getTask().getJobId());
+ }
if (tip != null) {
tasksToCleanup.put(tip);
} else {
@@ -551,8 +647,8 @@
return true;
}
-
- /**
+
+ /**
* Start a new task.
* All exceptions are handled locally, so that we don't mess up the
* task tracker.
@@ -569,20 +665,10 @@
reduceTotal++;
}
}
- synchronized (tip) {
- try {
- tip.launchTask();
- } catch (Throwable ie) {
- tip.runstate = TaskStatus.FAILED;
- try {
- tip.cleanup();
- } catch (Throwable ie2) {
- // Ignore it, we are just trying to cleanup.
- }
- String error = StringUtils.stringifyException(ie);
- tip.reportDiagnosticInfo(error);
- LOG.info(error);
- }
+ try{
+ localizeJob(tip);
+ }catch(IOException ie){
+ LOG.warn("Error initializing Job " + tip.getTask().getJobId());
}
}
@@ -691,6 +777,7 @@
private JobConf localJobConf;
private boolean keepFailedTaskFiles;
private boolean alwaysKeepTaskFiles;
+ private boolean keepJobFiles;
/**
*/
@@ -702,60 +789,52 @@
this.lastProgressReport = System.currentTimeMillis();
this.defaultJobConf = conf;
localJobConf = null;
+ keepJobFiles = false;
}
-
- /**
- * Some fields in the Task object need to be made machine-specific.
- * So here, edit the Task's fields appropriately.
- */
- private void localizeTask(Task t) throws IOException {
- this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" +
- task.getTaskId());
- Path localJobFile =
- this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
- Path localJarFile =
- this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
-
- String jobFile = t.getJobFile();
- fs.copyToLocalFile(new Path(jobFile), localJobFile);
- t.setJobFile(localJobFile.toString());
-
- localJobConf = new JobConf(localJobFile);
- localJobConf.set("mapred.local.dir",
- this.defaultJobConf.get("mapred.local.dir"));
- String jarFile = localJobConf.getJar();
- if (jarFile != null) {
- fs.copyToLocalFile(new Path(jarFile), localJarFile);
- localJobConf.setJar(localJarFile.toString());
- }
- task.localizeConfiguration(localJobConf);
-
- FileSystem localFs = FileSystem.getNamed("local", fConf);
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.write(out);
- } finally {
- out.close();
- }
- // set the task's configuration to the local job conf
- // rather than the default.
- t.setConf(localJobConf);
- keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+
+ private void localizeTask(Task task) throws IOException{
+ Path localTaskDir =
+ new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR
+ + JOBCACHE + Path.SEPARATOR
+ + task.getJobId()), task.getTaskId());
+ FileSystem localFs = FileSystem.getNamed("local", fConf);
+ localFs.mkdirs(localTaskDir);
+ Path localTaskFile = new Path(localTaskDir, "job.xml");
+ task.setJobFile(localTaskFile.toString());
+ localJobConf.set("mapred.local.dir",
+ fConf.get("mapred.local.dir"));
+
+ localJobConf.set("mapred.task.id", task.getTaskId());
+ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ task.localizeConfiguration(localJobConf);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ localJobConf.write(out);
+ } finally {
+ out.close();
+ }
+ task.setConf(localJobConf);
String keepPattern = localJobConf.getKeepTaskFilesPattern();
if (keepPattern != null) {
- alwaysKeepTaskFiles =
+ keepJobFiles = true;
+ alwaysKeepTaskFiles =
Pattern.matches(keepPattern, task.getTaskId());
} else {
alwaysKeepTaskFiles = false;
}
}
-
+
/**
*/
public Task getTask() {
return task;
}
+ public void setJobConf(JobConf lconf){
+ this.localJobConf = lconf;
+ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ }
+
/**
*/
public synchronized TaskStatus createStatus() {
@@ -876,11 +955,18 @@
* finished. If the task is still running, kill it (and clean up
*/
public synchronized void jobHasFinished() throws IOException {
+
if (getRunState() == TaskStatus.RUNNING) {
killAndCleanup(false);
} else {
cleanup();
}
+ if (keepJobFiles)
+ return;
+ // delete the job diretory for this task
+ // since the job is done/failed
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
+ JOBCACHE + Path.SEPARATOR + task.getJobId());
}
/**
@@ -934,10 +1020,13 @@
}
}
}
- this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
- }
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
+ JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+ taskId);
+ }
}
+
// ///////////////////////////////////////////////////////////////
// TaskUmbilicalProtocol
/////////////////////////////////////////////////////////////////
@@ -1034,6 +1123,16 @@
} else {
LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
}
+ }
+
+ /**
+ * The datastructure for initializing a job
+ */
+ static class RunningJob{
+ Path jobFile;
+ // keep this for later use
+ ArrayList tasks;
+ boolean localized;
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Thu Sep 14 15:13:43 2006
@@ -18,7 +18,10 @@
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.text.DecimalFormat;
+import org.apache.hadoop.fs.*;
/**
* General string utils
@@ -104,5 +107,82 @@
sbuf.append(strs[idx]);
}
return sbuf.toString();
+ }
+
+ /**
+ * Given an array of bytes it will convert the bytes to a hex string
+ * representation of the bytes
+ * @param bytes
+ * @return hex string representation of the byte array
+ */
+ public static String byteToHexString(byte bytes[]) {
+ StringBuffer retString = new StringBuffer();
+ for (int i = 0; i < bytes.length; ++i) {
+ retString.append(Integer.toHexString(0x0100 + (bytes[i] & 0x00FF))
+ .substring(1));
+ }
+ return retString.toString();
+ }
+
+ /**
+ * Given a hexstring this will return the byte array corresponding to the
+ * string
+ * @param hex the hex String array
+ * @return a byte array that is a hex string representation of the given
+ * string. The size of the byte array is therefore hex.length/2
+ */
+ public static byte[] hexStringToByte(String hex) {
+ byte[] bts = new byte[hex.length() / 2];
+ for (int i = 0; i < bts.length; i++) {
+ bts[i] = (byte) Integer.parseInt(hex.substring(2 * i, 2 * i + 2), 16);
+ }
+ return bts;
+ }
+ /**
+ *
+ * @param uris
+ * @return
+ */
+ public static String uriToString(URI[] uris){
+ String ret = null;
+ ret = uris[0].toString();
+ for(int i = 1; i < uris.length;i++){
+ ret = ret + "," + uris[i].toString();
+ }
+ return ret;
+ }
+
+ /**
+ *
+ * @param str
+ * @return
+ */
+ public static URI[] stringToURI(String[] str){
+ if (str == null)
+ return null;
+ URI[] uris = new URI[str.length];
+ for (int i = 0; i < str.length;i++){
+ try{
+ uris[i] = new URI(str[i]);
+ }catch(URISyntaxException ur){
+ System.out.println("Exception in specified URI's " + StringUtils.stringifyException(ur));
+ //making sure its asssigned to null in case of an error
+ uris[i] = null;
+ }
+ }
+ return uris;
+ }
+
+ /**
+ *
+ * @param str
+ * @return
+ */
+ public static Path[] stringToPath(String[] str){
+ Path[] p = new Path[str.length];
+ for (int i = 0; i < str.length;i++){
+ p[i] = new Path(str[i]);
+ }
+ return p;
}
}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Thu Sep 14 15:13:43 2006
@@ -0,0 +1,205 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.mapred.MapReduceBase;
+import java.io.*;
+import org.apache.hadoop.filecache.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class MRCaching {
+ static String testStr = "This is a test file " + "used for testing caching "
+ + "jars, zip and normal files.";
+
+ /**
+ * Using the wordcount example and adding caching to it. The cache
+ * archives/files are set and then are checked in the map if they have been
+ * localized or not.
+ */
+ public static class MapClass extends MapReduceBase implements Mapper {
+ JobConf conf;
+
+ private final static IntWritable one = new IntWritable(1);
+
+ private Text word = new Text();
+
+ public void configure(JobConf jconf) {
+ conf = jconf;
+ try {
+ Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+ FileSystem fs = FileSystem.get(conf);
+ // read the cached files (unzipped, unjarred and text)
+ // and put it into a single file /tmp/test.txt
+ Path file = new Path("/tmp");
+ fs.mkdirs(file);
+ Path fileOut = new Path(file, "test.txt");
+ fs.delete(file);
+ DataOutputStream out = fs.create(fileOut);
+
+ for (int i = 0; i < localArchives.length; i++) {
+ // read out the files from these archives
+ File f = new File(localArchives[i].toString());
+ File txt = new File(f, "test.txt");
+ FileInputStream fin = new FileInputStream(txt);
+ DataInputStream din = new DataInputStream(fin);
+ String str = din.readLine();
+ din.close();
+ out.writeBytes(str);
+ out.writeBytes("\n");
+ }
+ for (int i = 0; i < localFiles.length; i++) {
+ // read out the files from these archives
+ File txt = new File(localFiles[i].toString());
+ FileInputStream fin = new FileInputStream(txt);
+ DataInputStream din = new DataInputStream(fin);
+ String str = din.readLine();
+ out.writeBytes(str);
+ out.writeBytes("\n");
+ }
+ out.close();
+ } catch (IOException ie) {
+ System.out.println(StringUtils.stringifyException(ie));
+ }
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter) throws IOException {
+ String line = ((Text) value).toString();
+ StringTokenizer itr = new StringTokenizer(line);
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ output.collect(word, one);
+ }
+
+ }
+ }
+
+ /**
+ * A reducer class that just emits the sum of the input values.
+ */
+ public static class ReduceClass extends MapReduceBase implements Reducer {
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter) throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += ((IntWritable) values.next()).get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ public static boolean launchMRCache(String jobTracker, String indir,
+ String outdir, String fileSys, JobConf conf, String input)
+ throws IOException {
+ final Path inDir = new Path(indir);
+ final Path outDir = new Path(outdir);
+ FileSystem fs = FileSystem.getNamed(fileSys, conf);
+ fs.delete(outDir);
+ fs.mkdirs(inDir);
+
+ {
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+ }
+ conf.set("fs.default.name", fileSys);
+ conf.set("mapred.job.tracker", jobTracker);
+ conf.setJobName("cachetest");
+
+ // the keys are words (strings)
+ conf.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(MRCaching.MapClass.class);
+ conf.setCombinerClass(MRCaching.ReduceClass.class);
+ conf.setReducerClass(MRCaching.ReduceClass.class);
+ conf.setInputPath(inDir);
+ conf.setOutputPath(outDir);
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(1);
+ conf.setSpeculativeExecution(false);
+ Path localPath = new Path("build/test/cache");
+ Path txtPath = new Path(localPath, new Path("test.txt"));
+ Path jarPath = new Path(localPath, new Path("test.jar"));
+ Path zipPath = new Path(localPath, new Path("test.zip"));
+ Path cacheTest = new Path("/tmp/cachedir");
+ fs.delete(cacheTest);
+ fs.mkdirs(cacheTest);
+ fs.copyFromLocalFile(txtPath, cacheTest);
+ fs.copyFromLocalFile(jarPath, cacheTest);
+ fs.copyFromLocalFile(zipPath, cacheTest);
+ // setting the cached archives to zip, jar and simple text files
+ String archive1 = "dfs://" + fileSys + "/tmp/cachedir/test.jar";
+ String archive2 = "dfs://" + fileSys + "/tmp/cachedir/test.zip";
+ String file1 = "dfs://" + fileSys + "/tmp/cachedir/test.txt";
+ URI uri1 = null;
+ URI uri2 = null;
+ URI uri3 = null;
+ try{
+ uri1 = new URI(archive1);
+ uri2 = new URI(archive2);
+ uri3 = new URI(file1);
+ } catch(URISyntaxException ur){
+ }
+ DistributedCache.addCacheArchive(uri1, conf);
+ DistributedCache.addCacheArchive(uri2, conf);
+ DistributedCache.addCacheFile(uri3, conf);
+ JobClient.runJob(conf);
+ int count = 0;
+ // after the job ran check to see if the the input from the localized cache
+ // match the real string. check if there are 3 instances or not.
+ Path result = new Path("/tmp/test.txt");
+ {
+ BufferedReader file = new BufferedReader(new InputStreamReader(fs
+ .open(result)));
+ String line = file.readLine();
+ while (line != null) {
+ if (!testStr.equals(line))
+ return false;
+ count++;
+ line = file.readLine();
+
+ }
+ file.close();
+ }
+ if (count != 3)
+ return false;
+
+ return true;
+
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Thu Sep 14 15:13:43 2006
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+
+/**
+ * A JUnit test to test caching with DFS
+ *
+ * @author Mahadev Konar
+ */
+public class TestMiniMRDFSCaching extends TestCase {
+
+ public void testWithDFS() throws IOException {
+ MiniMRCluster mr = null;
+ MiniDFSCluster dfs = null;
+ String namenode = null;
+ FileSystem fileSys = null;
+ try {
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(65314, conf, true);
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getName();
+ mr = new MiniMRCluster(50050, 50060, 2, namenode, true);
+ JobConf jconf = new JobConf();
+ // run the wordcount example with caching
+ boolean ret = MRCaching.launchMRCache("localhost:50050",
+ "/testing/wc/input",
+ "/testing/wc/output", namenode,
+ jconf,
+ "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n");
+ assertTrue("Archives not matching", ret);
+ } finally {
+ if (fileSys != null) {
+ fileSys.close();
+ }
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception {
+ TestMiniMRDFSCaching td = new TestMiniMRDFSCaching();
+ td.testWithDFS();
+ }
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Thu Sep 14 15:13:43 2006
@@ -36,6 +36,14 @@
double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+ JobConf jconf = new JobConf();
+ // run the wordcount example with caching
+ boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input",
+ "/tmp/wc/output", "local", jconf,
+ "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n");
+ // assert the number of lines read during caching
+ assertTrue("Failed test archives not matching", ret);
} finally {
if (mr != null) { mr.shutdown(); }
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Thu Sep 14 15:13:43 2006
@@ -95,12 +95,14 @@
* @param taskDirs the task ids that should be present
*/
private static void checkTaskDirectories(MiniMRCluster mr,
+ String[] jobIds,
String[] taskDirs) {
mr.waitUntilIdle();
int trackers = mr.getNumTaskTrackers();
List neededDirs = new ArrayList(Arrays.asList(taskDirs));
boolean[] found = new boolean[taskDirs.length];
for(int i=0; i < trackers; ++i) {
+ int numNotDel = 0;
File localDir = new File(mr.getTaskTrackerLocalDir(i));
File trackerDir = new File(localDir, "taskTracker");
assertTrue("local dir " + localDir + " does not exist.",
@@ -113,7 +115,7 @@
System.out.println("Local " + localDir + ": " + contents[j]);
}
for(int j=0; j < trackerContents.length; ++j) {
- System.out.println("Local " + trackerDir + ": " + trackerContents[j]);
+ System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
}
for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
String name = contents[fileIdx];
@@ -123,13 +125,11 @@
localDir, idx != -1);
assertTrue("Matching output directory not found " + name +
" in " + trackerDir,
- new File(trackerDir, name).isDirectory());
+ new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
found[idx] = true;
+ numNotDel++;
}
}
- assertTrue("The local directory had " + contents.length +
- " and task tracker directory had " + trackerContents.length +
- " items.", contents.length == trackerContents.length + 1);
}
for(int i=0; i< found.length; i++) {
assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
@@ -155,7 +155,7 @@
jobTrackerName, namenode);
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
- checkTaskDirectories(mr, new String[]{});
+ checkTaskDirectories(mr, new String[]{}, new String[]{});
// Run a word count example
JobConf jobConf = new JobConf();
@@ -168,7 +168,7 @@
3, 1);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
- checkTaskDirectories(mr, new String[]{"task_0002_m_000001_0"});
+ checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
} finally {
if (fileSys != null) { fileSys.close(); }
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar?view=auto&rev=443497
==============================================================================
Binary file - no diff available.
Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt Thu Sep 14 15:13:43 2006
@@ -0,0 +1 @@
+This is a test file used for testing caching jars, zip and normal files.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip?view=auto&rev=443497
==============================================================================
Binary file - no diff available.
Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream