You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ni...@apache.org on 2008/03/08 20:56:39 UTC
svn commit: r635061 - in /hadoop/core/branches/branch-0.16: ./
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/secur...
Author: nigel
Date: Sat Mar 8 11:56:35 2008
New Revision: 635061
URL: http://svn.apache.org/viewvc?rev=635061&view=rev
Log:
Branch specific patch for HADOOP-2915. Fixed FileSystem.CACHE so that a username is included in the cache key. Contributed by Tsz Wo (Nicholas) SZE.
Added:
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java (with props)
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (with props)
Modified:
hadoop/core/branches/branch-0.16/CHANGES.txt
hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Sat Mar 8 11:56:35 2008
@@ -132,6 +132,9 @@
they were looking at the same config variables (Chris Douglas via
acmurthy)
+ HADOOP-2915. Fixed FileSystem.CACHE so that a username is included
+ in the cache key. (Tsz Wo (Nicholas), SZE via nigel)
+
Release 0.16.0 - 2008-02-07
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (original)
+++ hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Sat Mar 8 11:56:35 2008
@@ -55,11 +55,10 @@
boolean mayExit = false;
MiniMRCluster mr = null;
MiniDFSCluster dfs = null;
- FileSystem fileSys = null;
try{
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, 1, true, null);
- fileSys = dfs.getFileSystem();
+ FileSystem fileSys = dfs.getFileSystem();
String namenode = fileSys.getName();
mr = new MiniMRCluster(1, namenode, 3);
// During tests, the default Configuration will use a local mapred
@@ -98,6 +97,8 @@
job = new StreamJob(argv, mayExit);
job.go();
+
+ fileSys = dfs.getFileSystem();
String line = null;
String line2 = null;
Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
@@ -113,7 +114,6 @@
assertEquals(cacheString + "\t", line);
assertEquals(cacheString2 + "\t", line2);
} finally{
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();}
}
Modified: hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ hadoop/core/branches/branch-0.16/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Sat Mar 8 11:56:35 2008
@@ -52,11 +52,10 @@
boolean mayExit = false;
MiniMRCluster mr = null;
MiniDFSCluster dfs = null;
- FileSystem fileSys = null;
try{
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, 1, true, null);
- fileSys = dfs.getFileSystem();
+ FileSystem fileSys = dfs.getFileSystem();
String namenode = fileSys.getName();
mr = new MiniMRCluster(1, namenode, 3);
// During tests, the default Configuration will use a local mapred
@@ -90,6 +89,8 @@
job = new StreamJob(argv, mayExit);
job.go();
+
+ fileSys = dfs.getFileSystem();
String line = null;
Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
for (int i = 0; i < fileList.length; i++){
@@ -101,7 +102,6 @@
}
assertEquals(cacheString + "\t", line);
} finally{
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();}
}
Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/fs/FileSystem.java Sat Mar 8 11:56:35 2008
@@ -28,15 +28,15 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.*;
-import org.apache.hadoop.fs.permission.AccessControlException;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.security.UserGroupInformation;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
* may be implemented as a distributed filesystem, or as a "local"
* one that reflects the locally-connected disk. The local version
- * exists for small Hadopp instances and for testing.
+ * exists for small Hadoop instances and for testing.
*
* <p>
*
@@ -50,12 +50,12 @@
* The local implementation is {@link LocalFileSystem} and distributed
* implementation is {@link DistributedFileSystem}.
*****************************************************************/
-public abstract class FileSystem extends Configured {
+public abstract class FileSystem extends Configured implements Closeable {
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FileSystem");
- // cache indexed by URI scheme and authority
- private static final Map<String,Map<String,FileSystem>> CACHE
- = new HashMap<String,Map<String,FileSystem>>();
+ /** FileSystem cache */
+ private static final Cache CACHE = new Cache();
+
/**
* Parse the cmd-line args, starting at i. Remove consumed args
* from array. We expect param in the form:
@@ -137,9 +137,7 @@
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
*/
- public static synchronized FileSystem get(URI uri, Configuration conf)
- throws IOException {
-
+ public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
@@ -147,27 +145,7 @@
return get(conf);
}
- Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
- if (authorityToFs == null) {
- if (CACHE.isEmpty()) {
- Runtime.getRuntime().addShutdownHook(clientFinalizer);
- }
- authorityToFs = new HashMap<String,FileSystem>();
- CACHE.put(scheme, authorityToFs);
- }
-
- FileSystem fs = authorityToFs.get(authority);
- if (fs == null) {
- Class fsClass = conf.getClass("fs."+scheme+".impl", null);
- if (fsClass == null) {
- throw new IOException("No FileSystem for scheme: " + scheme);
- }
- fs = (FileSystem)ReflectionUtils.newInstance(fsClass, conf);
- fs.initialize(uri, conf);
- authorityToFs.put(authority, fs);
- }
-
- return fs;
+ return CACHE.get(uri, conf);
}
private static class ClientFinalizer extends Thread {
@@ -187,14 +165,8 @@
*
* @throws IOException
*/
- public static synchronized void closeAll() throws IOException {
- Set<String> scheme = new HashSet<String>(CACHE.keySet());
- for (String s : scheme) {
- Set<String> authority = new HashSet<String>(CACHE.get(s).keySet());
- for (String a : authority) {
- CACHE.get(s).get(a).close();
- }
- }
+ public static void closeAll() throws IOException {
+ CACHE.closeAll();
}
/** Make sure that a path specifies a FileSystem. */
@@ -1110,22 +1082,7 @@
* release any held locks.
*/
public void close() throws IOException {
- URI uri = getUri();
- synchronized (FileSystem.class) {
- Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
- if (authorityToFs != null) {
- authorityToFs.remove(uri.getAuthority());
- if (authorityToFs.isEmpty()) {
- CACHE.remove(uri.getScheme());
- if (CACHE.isEmpty() && !clientFinalizer.isAlive()) {
- if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
- LOG.info("Could not cancel cleanup thread, though no " +
- "FileSystems are open");
- }
- }
- }
- }
- }
+ CACHE.remove(new Cache.Key(getUri(), getConf()));
}
/** Return the total size of all files in the filesystem.*/
@@ -1211,5 +1168,119 @@
*/
public void setOwner(Path p, String username, String groupname
) throws IOException {
+ }
+
+ private static FileSystem createFileSystem(URI uri, Configuration conf
+ ) throws IOException {
+ Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
+ if (clazz == null) {
+ throw new IOException("No FileSystem for scheme: " + uri.getScheme());
+ }
+ FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+ fs.initialize(uri, conf);
+ return fs;
+ }
+
+ /** Caching FileSystem objects */
+ private static class Cache {
+ final Map<Key, FsRef> map = new HashMap<Key, FsRef>();
+
+ synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
+ Key key = new Key(uri, conf);
+ FsRef ref = map.get(key);
+ FileSystem fs = ref == null? null: ref.get();
+ if (fs == null) {
+ if (map.isEmpty() && !clientFinalizer.isAlive()) {
+ Runtime.getRuntime().addShutdownHook(clientFinalizer);
+ }
+
+ fs = createFileSystem(uri, conf);
+ map.put(key, new FsRef(fs, key));
+ }
+ return fs;
+ }
+
+ synchronized FsRef remove(Key key) {
+ FsRef ref = map.remove(key);
+ if (map.isEmpty() && !clientFinalizer.isAlive()) {
+ if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
+ LOG.info("Could not cancel cleanup thread, though no " +
+ "FileSystems are open");
+ }
+ }
+ return ref;
+ }
+
+ synchronized void closeAll() throws IOException {
+ List<IOException> exceptions = new ArrayList<IOException>();
+ for(FsRef ref : new ArrayList<FsRef>(map.values())) {
+ FileSystem fs = ref.get();
+ if (fs != null) {
+ try {
+ fs.close();
+ }
+ catch(IOException ioe) {
+ exceptions.add(ioe);
+ }
+ }
+ else {
+ remove(ref.key);
+ }
+ }
+
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
+ }
+ }
+
+ /** Reference of FileSystem which contains the cache key */
+ private static class FsRef {
+ final FileSystem fs;
+ final Key key;
+
+ FsRef(FileSystem fs, Key key) {
+ this.fs = fs;
+ this.key = key;
+ }
+
+ FileSystem get() {return fs;}
+ }
+
+ /** FileSystem.Cache.Key */
+ private static class Key {
+ final String scheme;
+ final String authority;
+ final String username;
+
+ Key(URI uri, Configuration conf) throws IOException {
+ scheme = uri.getScheme();
+ authority = uri.getAuthority();
+ UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
+ username = ugi == null? null: ugi.getUserName();
+ }
+
+ /** {@inheritDoc} */
+ public int hashCode() {
+ return (scheme + authority + username).hashCode();
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == b || (a != null && a.equals(b));
+ }
+
+ /** {@inheritDoc} */
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj != null && obj instanceof Key) {
+ Key that = (Key)obj;
+ return isEqual(this.scheme, that.scheme)
+ && isEqual(this.authority, that.authority)
+ && isEqual(this.username, that.username);
+ }
+ return false;
+ }
+ }
}
}
Added: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java?rev=635061&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java (added)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java Sat Mar 8 11:56:35 2008
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Encapsulate a list of {@link IOException} into an {@link IOException} */
+public class MultipleIOException extends IOException {
+ /** Require by {@link java.io.Serializable} */
+ private static final long serialVersionUID = 1L;
+
+ private final List<IOException> exceptions;
+
+ /** Constructor is private, use {@link #createIOException(List)}. */
+ private MultipleIOException(List<IOException> exceptions) {
+ super(exceptions.size() + " exceptions " + exceptions);
+ this.exceptions = exceptions;
+ }
+
+ /** @return the underlying exceptions */
+ public List<IOException> getExceptions() {return exceptions;}
+
+ /** A convenient method to create an {@link IOException}. */
+ public static IOException createIOException(List<IOException> exceptions) {
+ if (exceptions == null || exceptions.isEmpty()) {
+ return null;
+ }
+ if (exceptions.size() == 1) {
+ return exceptions.get(0);
+ }
+ return new MultipleIOException(exceptions);
+ }
+}
\ No newline at end of file
Propchange: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/io/MultipleIOException.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/mapred/TaskTracker.java Sat Mar 8 11:56:35 2008
@@ -147,7 +147,6 @@
//for serving map output to the other nodes
static Random r = new Random();
- FileSystem fs = null;
private static final String SUBDIR = "taskTracker";
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
@@ -658,6 +657,7 @@
throw new IOException("Not able to create job directory "
+ jobDir.toString());
}
+ FileSystem fs =FileSystem.getNamed(jobClient.getFilesystemName(),fConf);
fs.copyToLocalFile(new Path(jobFile), localJobFile);
JobConf localJobConf = new JobConf(localJobFile);
@@ -846,12 +846,6 @@
return jobClient;
}
- /**Return the DFS filesystem
- */
- public FileSystem getFileSystem(){
- return fs;
- }
-
/** Return the port at which the tasktracker bound to */
public synchronized InetSocketAddress getTaskTrackerReportAddress() {
return taskReportAddress;
@@ -891,7 +885,6 @@
*/
State offerService() throws Exception {
long lastHeartbeat = 0;
- this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
while (running && !shuttingDown) {
try {
Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/security/UserGroupInformation.java Sat Mar 8 11:56:35 2008
@@ -17,8 +17,13 @@
*/
package org.apache.hadoop.security;
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
/** A {@link Writable} abstract class for storing user and groups information.
@@ -53,4 +58,15 @@
* @return an array of group names
*/
public abstract String[] getGroupNames();
+
+ /** Read a {@link UserGroupInformation} from conf */
+ public static UserGroupInformation readFrom(Configuration conf
+ ) throws IOException {
+ try {
+ return UnixUserGroupInformation.readFromConf(conf,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME);
+ } catch (LoginException e) {
+ throw (IOException)new IOException().initCause(e);
+ }
+ }
}
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/fs/TestFileSystem.java Sat Mar 8 11:56:35 2008
@@ -35,7 +35,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.InputFormatBase;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
@@ -43,9 +42,11 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
public class TestFileSystem extends TestCase {
- private static final Log LOG = InputFormatBase.LOG;
+ private static final Log LOG = FileSystem.LOG;
private static Configuration conf = new Configuration();
private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
@@ -463,4 +464,17 @@
}
}
+ static Configuration createConf4Testing(String username) throws Exception {
+ Configuration conf = new Configuration();
+ UnixUserGroupInformation.saveToConf(conf,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME,
+ new UnixUserGroupInformation(username, new String[]{"group"}));
+ return conf;
+ }
+
+ public void testFsCache() throws Exception {
+ Configuration c1 = createConf4Testing("foo");
+ Configuration c2 = createConf4Testing("bar");
+ assertFalse(FileSystem.get(c1) == FileSystem.get(c2));
+ }
}
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Sat Mar 8 11:56:35 2008
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.io.IOException;
import java.util.Map;
import java.util.Properties;
@@ -43,7 +44,6 @@
public abstract class ClusterMapReduceTestCase extends TestCase {
private MiniDFSCluster dfsCluster = null;
private MiniMRCluster mrCluster = null;
- private FileSystem fileSystem = null;
/**
* Creates Hadoop Cluster and DFS before a test case is run.
@@ -79,11 +79,10 @@
}
}
dfsCluster = new MiniDFSCluster(conf, 2, reformatDFS, null);
- fileSystem = dfsCluster.getFileSystem();
ConfigurableMiniMRCluster.setConfiguration(props);
//noinspection deprecation
- mrCluster = new ConfigurableMiniMRCluster(2, fileSystem.getName(), 1);
+ mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(), 1);
}
}
@@ -129,7 +128,6 @@
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
- fileSystem = null;
}
}
@@ -150,9 +148,10 @@
* TestCases should use this Filesystem instance.
*
* @return the filesystem used by Hadoop.
+ * @throws IOException
*/
- protected FileSystem getFileSystem() {
- return fileSystem;
+ protected FileSystem getFileSystem() throws IOException {
+ return dfsCluster.getFileSystem();
}
/**
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Sat Mar 8 11:56:35 2008
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UnixUserGroupInformation;
/**
* This class creates a single-process Map-Reduce cluster for junit testing.
@@ -42,6 +43,7 @@
private List<Thread> taskTrackerThreadList = new ArrayList<Thread>();
private String namenode;
+ private UnixUserGroupInformation ugi = null;
/**
* An inner class that runs a job tracker.
@@ -97,7 +99,6 @@
class TaskTrackerRunner implements Runnable {
volatile TaskTracker tt;
int trackerId;
- JobConf conf = createJobConf();
// the localDirs for this taskTracker
String[] localDirs;
volatile boolean isInitialized = false;
@@ -108,7 +109,7 @@
this.trackerId = trackerId;
this.numDir = numDir;
localDirs = new String[numDir];
- conf = createJobConf();
+ JobConf conf = createJobConf();
conf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
conf.set("mapred.task.tracker.report.address",
"127.0.0.1:" + taskTrackerPort);
@@ -132,6 +133,14 @@
}
conf.set("mapred.local.dir", localPath.toString());
LOG.info("mapred.local.dir is " + localPath);
+ try {
+ tt = new TaskTracker(conf);
+ isInitialized = true;
+ } catch (Throwable e) {
+ isDead = true;
+ tt = null;
+ LOG.error("task tracker " + trackerId + " crashed", e);
+ }
}
/**
@@ -139,9 +148,9 @@
*/
public void run() {
try {
- tt = new TaskTracker(conf);
- isInitialized = true;
- tt.run();
+ if (tt != null) {
+ tt.run();
+ }
} catch (Throwable e) {
isDead = true;
tt = null;
@@ -227,6 +236,11 @@
result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
result.set("mapred.job.tracker.http.address",
"0.0.0.0:" + jobTrackerInfoPort);
+ if (ugi != null) {
+ result.set("mapred.system.dir", "/mapred/system");
+ UnixUserGroupInformation.saveToConf(result,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+ }
// for debugging have all task output sent to the test output
JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
return result;
@@ -264,12 +278,19 @@
int numTaskTrackers,
String namenode,
boolean taskTrackerFirst, int numDir) throws IOException {
+ this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode,
+ taskTrackerFirst, numDir, null);
+ }
+ public MiniMRCluster(int jobTrackerPort, int taskTrackerPort,
+ int numTaskTrackers, String namenode, boolean taskTrackerFirst,
+ int numDir, UnixUserGroupInformation ugi) throws IOException {
this.jobTrackerPort = jobTrackerPort;
this.taskTrackerPort = taskTrackerPort;
this.jobTrackerInfoPort = 0;
this.numTaskTrackers = numTaskTrackers;
this.namenode = namenode;
+ this.ugi = ugi;
// Create the JobTracker
jobTracker = new JobTrackerRunner();
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/PiEstimator.java Sat Mar 8 11:56:35 2008
@@ -175,15 +175,15 @@
try {
JobClient.runJob(jobConf);
Path inFile = new Path(outDir, "reduce-out");
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
- jobConf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(
+ FileSystem.get(jobConf), inFile, jobConf);
IntWritable numInside = new IntWritable();
IntWritable numOutside = new IntWritable();
reader.next(numInside, numOutside);
reader.close();
estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
} finally {
- fileSys.delete(tmpDir);
+ FileSystem.get(jobConf).delete(tmpDir);
}
return estimate;
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Sat Mar 8 11:56:35 2008
@@ -172,7 +172,6 @@
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
} finally {
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
@@ -207,7 +206,6 @@
}
finally {
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Sat Mar 8 11:56:35 2008
@@ -94,7 +94,6 @@
// Run sort-validator to check if sort worked correctly
runSortValidator(mr.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
} finally {
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Sat Mar 8 11:56:35 2008
@@ -112,7 +112,7 @@
* @param mr the map-reduce cluster
* @param taskDirs the task ids that should be present
*/
- private static void checkTaskDirectories(MiniMRCluster mr,
+ static void checkTaskDirectories(MiniMRCluster mr,
String[] jobIds,
String[] taskDirs) {
mr.waitUntilIdle();
@@ -155,7 +155,55 @@
assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
}
}
-
+
+ static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+ LOG.info("runPI");
+ double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+ double error = Math.abs(Math.PI - estimate);
+ assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+ checkTaskDirectories(mr, new String[]{}, new String[]{});
+ }
+
+ static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+ LOG.info("runWordCount");
+ // Run a word count example
+ // Keeping tasks that match this pattern
+ jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
+ TestResult result;
+ final Path inDir = new Path("./wc/input");
+ final Path outDir = new Path("./wc/output");
+ result = launchWordCount(jobConf, inDir, outDir,
+ "The quick brown fox\nhas many silly\n" +
+ "red fox sox\n",
+ 3, 1);
+ assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+ "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
+ String jobid = result.job.getJobID();
+ String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
+ checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
+ // test with maps=0
+ jobConf = mr.createJobConf();
+ result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
+ assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
+ // Run a job with input and output going to localfs even though the
+ // default fs is hdfs.
+ {
+ FileSystem localfs = FileSystem.getLocal(jobConf);
+ String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data","/tmp"))
+ .toString().replace(' ', '+');
+ Path localIn = localfs.makeQualified
+ (new Path(TEST_ROOT_DIR + "/local/in"));
+ Path localOut = localfs.makeQualified
+ (new Path(TEST_ROOT_DIR + "/local/out"));
+ result = launchWordCount(jobConf, localIn, localOut,
+ "all your base belong to us", 1, 1);
+ assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n",
+ result.output);
+ assertTrue("outputs on localfs", localfs.exists(localOut));
+ }
+ }
+
public void testWithDFS() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
@@ -166,52 +214,11 @@
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
- mr = new MiniMRCluster(taskTrackers, fileSys.getName(), 1);
- double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES,
- mr.createJobConf());
- double error = Math.abs(Math.PI - estimate);
- assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
- checkTaskDirectories(mr, new String[]{}, new String[]{});
-
- // Run a word count example
- JobConf jobConf = mr.createJobConf();
- // Keeping tasks that match this pattern
- jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
- TestResult result;
- final Path inDir = new Path("/testing/wc/input");
- final Path outDir = new Path("/testing/wc/output");
- result = launchWordCount(jobConf, inDir, outDir,
- "The quick brown fox\nhas many silly\n" +
- "red fox sox\n",
- 3, 1);
- assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
- "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
- String jobid = result.job.getJobID();
- String taskid = "task_" + jobid.substring(4) + "_m_000001_0";
- checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
- // test with maps=0
- jobConf = mr.createJobConf();
- result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
- assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
- // Run a job with input and output going to localfs even though the
- // default fs is hdfs.
- {
- FileSystem localfs = FileSystem.getLocal(jobConf);
- String TEST_ROOT_DIR =
- new File(System.getProperty("test.build.data","/tmp"))
- .toString().replace(' ', '+');
- Path localIn = localfs.makeQualified
- (new Path(TEST_ROOT_DIR + "/local/in"));
- Path localOut = localfs.makeQualified
- (new Path(TEST_ROOT_DIR + "/local/out"));
- result = launchWordCount(jobConf, localIn, localOut,
- "all your base belong to us", 1, 1);
- assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n",
- result.output);
- assertTrue("outputs on localfs", localfs.exists(localOut));
- }
+ mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+
+ runPI(mr, mr.createJobConf());
+ runWordCount(mr, mr.createJobConf());
} finally {
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
Added: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=635061&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (added)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Sat Mar 8 11:56:35 2008
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.*;
+
+/**
+ * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
+ */
+public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
+ static final long now = System.currentTimeMillis();
+ static final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true);
+ static final UnixUserGroupInformation PI_UGI = createUGI("pi", false);
+ static final UnixUserGroupInformation WC_UGI = createUGI("wc", false);
+
+ static UnixUserGroupInformation createUGI(String name, boolean issuper) {
+ String username = name + now;
+ String group = issuper? "supergroup": username;
+ return UnixUserGroupInformation.createImmutable(
+ new String[]{username, group});
+ }
+
+ static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
+ JobConf jobconf = mr.createJobConf();
+ UnixUserGroupInformation.saveToConf(jobconf,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+ return jobconf;
+ }
+
+ static void mkdir(FileSystem fs, String dir) throws IOException {
+ Path p = new Path(dir);
+ fs.mkdirs(p);
+ fs.setPermission(p, new FsPermission((short)0777));
+ }
+
+ public void testDistinctUsers() throws Exception {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ try {
+ Configuration conf = new Configuration();
+ UnixUserGroupInformation.saveToConf(conf,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI);
+ dfs = new MiniDFSCluster(conf, 4, true, null);
+ FileSystem fs = dfs.getFileSystem();
+ mkdir(fs, "/user");
+ mkdir(fs, "/mapred");
+
+ UnixUserGroupInformation MR_UGI = createUGI(
+ UnixUserGroupInformation.login().getUserName(), true);
+ mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
+ false, 1, MR_UGI);
+
+ JobConf pi = createJobConf(mr, PI_UGI);
+ TestMiniMRWithDFS.runPI(mr, pi);
+
+ JobConf wc = createJobConf(mr, WC_UGI);
+ TestMiniMRWithDFS.runWordCount(mr, wc);
+ } finally {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown();}
+ }
+ }
+}
\ No newline at end of file
Propchange: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java Sat Mar 8 11:56:35 2008
@@ -117,7 +117,6 @@
assertTrue(result);
} finally {
- if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown(); }
}
Modified: hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=635061&r1=635060&r2=635061&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/branches/branch-0.16/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Sat Mar 8 11:56:35 2008
@@ -42,6 +42,11 @@
private static final Log LOG =
LogFactory.getLog(TestPipes.class.getName());
+ static void cleanup(FileSystem fs, Path p) throws IOException {
+ FileUtil.fullyDelete(fs, p);
+ assertFalse("output not cleaned up", fs.exists(p));
+ }
+
public void testPipes() throws IOException {
if (System.getProperty("compile.c++") == null) {
LOG.info("compile.c++ is not defined, so skipping TestPipes");
@@ -49,7 +54,6 @@
}
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
- FileSystem fs = null;
Path cppExamples = new Path(System.getProperty("install.c++.examples"));
Path inputPath = new Path("/testing/in");
Path outputPath = new Path("/testing/out");
@@ -57,20 +61,19 @@
final int numSlaves = 2;
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, numSlaves, true, null);
- fs = dfs.getFileSystem();
- mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
- writeInputFile(fs, inputPath);
- runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"),
+ mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
+ writeInputFile(dfs.getFileSystem(), inputPath);
+ runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
inputPath, outputPath, 3, 2, twoSplitOutput);
- FileUtil.fullyDelete(fs, outputPath);
- assertFalse("output not cleaned up", fs.exists(outputPath));
- runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"),
+ cleanup(dfs.getFileSystem(), outputPath);
+
+ runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
inputPath, outputPath, 3, 0, noSortOutput);
- FileUtil.fullyDelete(fs, outputPath);
- assertFalse("output not cleaned up", fs.exists(outputPath));
- runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
+ cleanup(dfs.getFileSystem(), outputPath);
+
+ runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
inputPath, outputPath, 3, 2, fixedPartitionOutput);
- runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
+ runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
mr.waitUntilIdle();
} finally {
mr.shutdown();
@@ -126,25 +129,28 @@
out.close();
}
- private void runProgram(MiniMRCluster mr, FileSystem fs,
+ private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
Path program, Path inputPath, Path outputPath,
int numMaps, int numReduces, String[] expectedResults
) throws IOException {
Path wordExec = new Path("/testing/bin/application");
- FileUtil.fullyDelete(fs, wordExec.getParent());
- fs.copyFromLocalFile(program, wordExec);
JobConf job = mr.createJobConf();
job.setNumMapTasks(numMaps);
job.setNumReduceTasks(numReduces);
- Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
- Submitter.setIsJavaRecordReader(job, true);
- Submitter.setIsJavaRecordWriter(job, true);
- job.setInputPath(inputPath);
- job.setOutputPath(outputPath);
- RunningJob result = Submitter.submitJob(job);
- assertTrue("pipes job failed", result.isSuccessful());
+ {
+ FileSystem fs = dfs.getFileSystem();
+ FileUtil.fullyDelete(fs, wordExec.getParent());
+ fs.copyFromLocalFile(program, wordExec);
+ Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
+ Submitter.setIsJavaRecordReader(job, true);
+ Submitter.setIsJavaRecordWriter(job, true);
+ job.setInputPath(inputPath);
+ job.setOutputPath(outputPath);
+ RunningJob result = Submitter.submitJob(job);
+ assertTrue("pipes job failed", result.isSuccessful());
+ }
List<String> results = new ArrayList<String>();
- for (Path p:fs.listPaths(outputPath)) {
+ for (Path p:dfs.getFileSystem().listPaths(outputPath)) {
results.add(TestMiniMRWithDFS.readOutput(p, job));
}
assertEquals("number of reduces is wrong",
@@ -163,7 +169,7 @@
* @param program the program to run
* @throws IOException
*/
- private void runNonPipedProgram(MiniMRCluster mr, FileSystem dfs,
+ private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
Path program) throws IOException {
JobConf job = mr.createJobConf();
job.setInputFormat(WordCountInputFormat.class);
@@ -174,8 +180,11 @@
Path outDir = new Path(testDir, "output");
Path wordExec = new Path("/testing/bin/application");
Path jobXml = new Path(testDir, "job.xml");
- FileUtil.fullyDelete(dfs, wordExec.getParent());
- dfs.copyFromLocalFile(program, wordExec);
+ {
+ FileSystem fs = dfs.getFileSystem();
+ FileUtil.fullyDelete(fs, wordExec.getParent());
+ fs.copyFromLocalFile(program, wordExec);
+ }
DataOutputStream out = local.create(new Path(inDir, "part0"));
out.writeBytes("i am a silly test\n");
out.writeBytes("you are silly\n");
@@ -193,7 +202,7 @@
"-input", inDir.toString(),
"-output", outDir.toString(),
"-program",
- dfs.makeQualified(wordExec).toString(),
+ dfs.getFileSystem().makeQualified(wordExec).toString(),
"-reduces", "2"});
} catch (Exception e) {
assertTrue("got exception: " + StringUtils.stringifyException(e), false);