You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/24 13:19:26 UTC
svn commit: r1126981 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/s...
Author: vinodkv
Date: Tue May 24 11:19:25 2011
New Revision: 1126981
URL: http://svn.apache.org/viewvc?rev=1126981&view=rev
Log:
Fix distributed-cache related bugs. (vinodkv)
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java
Removed:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java
hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 24 11:19:25 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ Fix distributed-cache related bugs. (vinodkv)
+
Fix for regression on the scheduling of reduces before maps are done (ddas)
Fix NPE in test case (mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Tue May 24 11:19:25 2011
@@ -25,20 +25,24 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -245,6 +249,10 @@ class YarnChild {
// setup the child's attempt directories
// Do the task-type specific localization
task.localizeConfiguration(job);
+
+ // Set up the DistributedCache related configs
+ setupDistributedCacheConfig(job);
+
// Overwrite the localized task jobconf which is linked to in the current
// work-dir.
Path localTaskFile = new Path(Constants.JOBFILE);
@@ -254,6 +262,62 @@ class YarnChild {
return job;
}
+ /**
+ * Set up the DistributedCache related configs to make
+ * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+ * and
+ * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+ * working.
+ * @param job
+ * @throws IOException
+ */
+ private static void setupDistributedCacheConfig(final JobConf job)
+ throws IOException {
+
+ String localWorkDir = System.getenv("PWD");
+ // ^ ^ all symlinks are created in the current work-dir
+
+ // Update the configuration object with localized archives.
+ URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+ if (cacheArchives != null) {
+ List<String> localArchives = new ArrayList<String>();
+ for (int i = 0; i < cacheArchives.length; ++i) {
+ URI u = cacheArchives[i];
+ Path p = new Path(u);
+ Path name =
+ new Path((null == u.getFragment()) ? p.getName()
+ : u.getFragment());
+ String linkName = name.toUri().getPath();
+ localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+ }
+ if (!localArchives.isEmpty()) {
+ job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+ .arrayToString(localArchives.toArray(new String[localArchives
+ .size()])));
+ }
+ }
+
+ // Update the configuration object with localized files.
+ URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+ if (cacheFiles != null) {
+ List<String> localFiles = new ArrayList<String>();
+ for (int i = 0; i < cacheFiles.length; ++i) {
+ URI u = cacheFiles[i];
+ Path p = new Path(u);
+ Path name =
+ new Path((null == u.getFragment()) ? p.getName()
+ : u.getFragment());
+ String linkName = name.toUri().getPath();
+ localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+ }
+ if (!localFiles.isEmpty()) {
+ job.set(MRJobConfig.CACHE_LOCALFILES,
+ StringUtils.arrayToString(localFiles
+ .toArray(new String[localFiles.size()])));
+ }
+ }
+ }
+
private static final FsPermission urw_gr =
FsPermission.createImmutable((short) 0640);
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue May 24 11:19:25 2011
@@ -145,8 +145,8 @@ public abstract class TaskAttemptImpl im
private final Lock writeLock;
private Collection<Token<? extends TokenIdentifier>> fsTokens;
private Token<JobTokenIdentifier> jobToken;
- private static AtomicBoolean initialEnvFlag = new AtomicBoolean();
- private static Map<String, String> initialEnv = null;
+ private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+ private static String initialClasspath = null;
private long launchTime;
private long finishTime;
@@ -455,19 +455,20 @@ public abstract class TaskAttemptImpl im
}
/**
- * Lock this on initialEnv so that there is only one fork in the AM for
+ * Lock this on initialClasspath so that there is only one fork in the AM for
* getting the initial class-path. TODO: This should go away once we construct
* a parent CLC and use it for all the containers.
*/
- private Map<String, String> getInitialEnv() throws IOException {
- synchronized (initialEnvFlag) {
- if (initialEnvFlag.get()) {
- return initialEnv;
+ private String getInitialClasspath() throws IOException {
+ synchronized (initialClasspathFlag) {
+ if (initialClasspathFlag.get()) {
+ return initialClasspath;
}
- initialEnv = new HashMap<String, String>();
- MRApps.setInitialClasspath(initialEnv);
- initialEnvFlag.set(true);
- return initialEnv;
+ Map<String, String> env = new HashMap<String, String>();
+ MRApps.setInitialClasspath(env);
+ initialClasspath = env.get(MRApps.CLASSPATH);
+ initialClasspathFlag.set(true);
+ return initialClasspath;
}
}
@@ -546,7 +547,7 @@ public abstract class TaskAttemptImpl im
ByteBuffer.wrap(jobToken_dob.getData(), 0,
jobToken_dob.getLength()));
- container.addAllEnv(getInitialEnv());
+ MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath());
} catch (IOException e) {
throw new YarnException(e);
}
@@ -612,7 +613,7 @@ public abstract class TaskAttemptImpl im
private void parseDistributedCacheArtifacts(
FileContext remoteFS, ContainerLaunchContext container, LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
- Path[] classpaths) throws IOException {
+ Path[] pathsToPutOnClasspath) throws IOException {
if (uris != null) {
// Sanity check
@@ -627,18 +628,18 @@ public abstract class TaskAttemptImpl im
}
Map<String, Path> classPaths = new HashMap<String, Path>();
- if (classpaths != null) {
- for (Path p : classpaths) {
+ if (pathsToPutOnClasspath != null) {
+ for (Path p : pathsToPutOnClasspath) {
+ p = p.makeQualified(remoteFS.getDefaultFileSystem()
+ .getUri(), remoteFS.getWorkingDirectory());
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
- if (!p.isAbsolute()) {
- p = p.makeQualified(remoteFS.getDefaultFileSystem()
+ p = p.makeQualified(remoteFS.getDefaultFileSystem()
.getUri(), remoteFS.getWorkingDirectory());
- }
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
@@ -646,8 +647,9 @@ public abstract class TaskAttemptImpl im
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
}
+ String linkName = name.toUri().getPath();
container.setLocalResource(
- name.toUri().getPath(),
+ linkName,
BuilderUtils.newLocalResource(recordFactory,
p.toUri(), type,
visibilities[i]
@@ -656,23 +658,12 @@ public abstract class TaskAttemptImpl im
sizes[i], timestamps[i])
);
if (classPaths.containsKey(u.getPath())) {
- addCacheArtifactToClassPath(container, name.toUri().getPath());
+ Map<String, String> environment = container.getAllEnv();
+ MRApps.addToClassPath(environment, linkName);
}
}
}
}
-
- private static final String CLASSPATH = "CLASSPATH";
- private static void addCacheArtifactToClassPath(
- ContainerLaunchContext container, String fileName) {
- String classpath = container.getEnv(CLASSPATH);
- if (classpath == null) {
- classpath = fileName;
- } else {
- classpath = classpath + ":" + fileName;
- }
- container.setEnv(CLASSPATH, classpath);
- }
// TODO - Move this to MR!
private static long[] getFileSizes(Configuration conf, String key) {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml Tue May 24 11:19:25 2011
@@ -65,4 +65,29 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <additionalClasspathElements>
+ <additionalClasspathElement>${basedir}/target/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT-tests.jar</additionalClasspathElement>
+ </additionalClasspathElements>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Tue May 24 11:19:25 2011
@@ -73,7 +73,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@@ -227,7 +226,7 @@ public class YARNRunner implements Clien
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
- getApplicationSubmissionContext(conf, jobSubmitDir, ts);
+ createApplicationSubmissionContext(conf, jobSubmitDir, ts);
setupDistributedCache(conf, appContext);
// XXX Remove
@@ -258,7 +257,7 @@ public class YARNRunner implements Clien
return rsrc;
}
- private ApplicationSubmissionContext getApplicationSubmissionContext(
+ private ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException {
ApplicationSubmissionContext appContext =
@@ -357,9 +356,14 @@ public class YARNRunner implements Clien
appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
return appContext;
}
-
+
/**
* TODO: Copied for now from TaskAttemptImpl.java ... fixme
+ *
+ * TODO: This is currently needed in YarnRunner as user code like setupJob,
+ * cleanupJob may need access to dist-cache. Once we separate distcache for
+ * maps, reduces, setup etc, this can include only a subset of artificats.
+ * This is also needed for uberAM case where we run everything inside AM.
*/
private void setupDistributedCache(Configuration conf,
ApplicationSubmissionContext container) throws IOException {
@@ -386,7 +390,7 @@ public class YARNRunner implements Clien
private void parseDistributedCacheArtifacts(
ApplicationSubmissionContext container, LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
- Path[] classpaths) throws IOException {
+ Path[] pathsToPutOnClasspath) {
if (uris != null) {
// Sanity check
@@ -401,18 +405,18 @@ public class YARNRunner implements Clien
}
Map<String, Path> classPaths = new HashMap<String, Path>();
- if (classpaths != null) {
- for (Path p : classpaths) {
+ if (pathsToPutOnClasspath != null) {
+ for (Path p : pathsToPutOnClasspath) {
+ p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
+ .getUri(), this.defaultFileContext.getWorkingDirectory());
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
Path p = new Path(u);
- if (!p.isAbsolute()) {
- p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
+ p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
.getUri(), this.defaultFileContext.getWorkingDirectory());
- }
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
@@ -420,8 +424,9 @@ public class YARNRunner implements Clien
if (name.isAbsolute()) {
throw new IllegalArgumentException("Resource name must be relative");
}
+ String linkName = name.toUri().getPath();
container.setResourceTodo(
- name.toUri().getPath(),
+ linkName,
createLocalResource(
p.toUri(), type,
visibilities[i]
@@ -431,8 +436,7 @@ public class YARNRunner implements Clien
);
if (classPaths.containsKey(u.getPath())) {
Map<String, String> environment = container.getAllEnvironment();
- MRApps.addToClassPath(environment, name.toUri().getPath());
- container.addAllEnvironment(environment);
+ MRApps.addToClassPath(environment, linkName);
}
}
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailMapper.java Tue May 24 11:19:25 2011
@@ -0,0 +1,23 @@
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+// Mapper that fails
+public class FailMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+ // NOTE- the next line is required for the TestDebugScript test to succeed
+ System.err.println("failing map");
+ throw new RuntimeException("failing map");
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java Tue May 24 11:19:25 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Fails the Mapper. First attempt throws exception. Rest do System.exit.
+ *
+ */
+public class FailingMapper extends Mapper<Text, Text, Text, Text> {
+ public void map(Text key, Text value,
+ Context context) throws IOException,InterruptedException {
+ if (context.getTaskAttemptID().getId() == 0) {
+ System.out.println("Attempt:" + context.getTaskAttemptID() +
+ " Failing mapper throwing exception");
+ throw new IOException("Attempt:" + context.getTaskAttemptID() +
+ " Failing mapper throwing exception");
+ } else {
+ System.out.println("Attempt:" + context.getTaskAttemptID() +
+ " Exiting");
+ System.exit(-1);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/RandomTextWriterJob.java Tue May 24 11:19:25 2011
@@ -0,0 +1,758 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class RandomTextWriterJob extends Configured implements Tool {
+
+ public static final String TOTAL_BYTES =
+ "mapreduce.randomtextwriter.totalbytes";
+ public static final String BYTES_PER_MAP =
+ "mapreduce.randomtextwriter.bytespermap";
+ public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
+ public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
+ public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
+ public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
+
+ static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+
+ public Job createJob(Configuration conf) throws IOException {
+ long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 10 * 1024);
+ long totalBytesToWrite = conf.getLong(TOTAL_BYTES, numBytesToWritePerMap);
+ int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
+ if (numMaps == 0 && totalBytesToWrite > 0) {
+ numMaps = 1;
+ conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
+ }
+ conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
+
+ Job job = new Job(conf);
+
+ job.setJarByClass(RandomTextWriterJob.class);
+ job.setJobName("random-text-writer");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(RandomInputFormat.class);
+ job.setMapperClass(RandomTextMapper.class);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ //FileOutputFormat.setOutputPath(job, new Path("random-output"));
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static class RandomInputFormat extends InputFormat<Text, Text> {
+
+ /**
+ * Generate the requested number of file splits, with the filename
+ * set to the filename of the output file.
+ */
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ List<InputSplit> result = new ArrayList<InputSplit>();
+ Path outDir = FileOutputFormat.getOutputPath(job);
+ int numSplits =
+ job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
+ for(int i=0; i < numSplits; ++i) {
+ result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
+ (String[])null));
+ }
+ return result;
+ }
+
+ /**
+ * Return a single record (filename, "") where the filename is taken from
+ * the file split.
+ */
+ public static class RandomRecordReader extends RecordReader<Text, Text> {
+ Path name;
+ Text key = null;
+ Text value = new Text();
+ public RandomRecordReader(Path p) {
+ name = p;
+ }
+
+ public void initialize(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ }
+
+ public boolean nextKeyValue() {
+ if (name != null) {
+ key = new Text();
+ key.set(name.getName());
+ name = null;
+ return true;
+ }
+ return false;
+ }
+
+ public Text getCurrentKey() {
+ return key;
+ }
+
+ public Text getCurrentValue() {
+ return value;
+ }
+
+ public void close() {}
+
+ public float getProgress() {
+ return 0.0f;
+ }
+ }
+
+ public RecordReader<Text, Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new RandomRecordReader(((FileSplit) split).getPath());
+ }
+ }
+
+ public static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
+
+ private long numBytesToWrite;
+ private int minWordsInKey;
+ private int wordsInKeyRange;
+ private int minWordsInValue;
+ private int wordsInValueRange;
+ private Random random = new Random();
+
+ /**
+ * Save the configuration value that we need to write the data.
+ */
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ numBytesToWrite = conf.getLong(BYTES_PER_MAP,
+ 1*1024*1024*1024);
+ minWordsInKey = conf.getInt(MIN_KEY, 5);
+ wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey);
+ minWordsInValue = conf.getInt(MIN_VALUE, 10);
+ wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue);
+ }
+
+ /**
+ * Given an output filename, write a bunch of random records to it.
+ */
+ public void map(Text key, Text value,
+ Context context) throws IOException,InterruptedException {
+ int itemCount = 0;
+ while (numBytesToWrite > 0) {
+ // Generate the key/value
+ int noWordsKey = minWordsInKey +
+ (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
+ int noWordsValue = minWordsInValue +
+ (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
+ Text keyWords = generateSentence(noWordsKey);
+ Text valueWords = generateSentence(noWordsValue);
+
+ // Write the sentence
+ context.write(keyWords, valueWords);
+
+ numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
+
+ // Update counters, progress etc.
+ context.getCounter(Counters.BYTES_WRITTEN).increment(
+ keyWords.getLength() + valueWords.getLength());
+ context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+ if (++itemCount % 200 == 0) {
+ context.setStatus("wrote record " + itemCount + ". " +
+ numBytesToWrite + " bytes left.");
+ }
+ }
+ context.setStatus("done with " + itemCount + " records.");
+ }
+
+ private Text generateSentence(int noWords) {
+ StringBuffer sentence = new StringBuffer();
+ String space = " ";
+ for (int i=0; i < noWords; ++i) {
+ sentence.append(words[random.nextInt(words.length)]);
+ sentence.append(space);
+ }
+ return new Text(sentence.toString());
+ }
+
+ private static String[] words = {
+ "diurnalness", "Homoiousian",
+ "spiranthic", "tetragynian",
+ "silverhead", "ungreat",
+ "lithograph", "exploiter",
+ "physiologian", "by",
+ "hellbender", "Filipendula",
+ "undeterring", "antiscolic",
+ "pentagamist", "hypoid",
+ "cacuminal", "sertularian",
+ "schoolmasterism", "nonuple",
+ "gallybeggar", "phytonic",
+ "swearingly", "nebular",
+ "Confervales", "thermochemically",
+ "characinoid", "cocksuredom",
+ "fallacious", "feasibleness",
+ "debromination", "playfellowship",
+ "tramplike", "testa",
+ "participatingly", "unaccessible",
+ "bromate", "experientialist",
+ "roughcast", "docimastical",
+ "choralcelo", "blightbird",
+ "peptonate", "sombreroed",
+ "unschematized", "antiabolitionist",
+ "besagne", "mastication",
+ "bromic", "sviatonosite",
+ "cattimandoo", "metaphrastical",
+ "endotheliomyoma", "hysterolysis",
+ "unfulminated", "Hester",
+ "oblongly", "blurredness",
+ "authorling", "chasmy",
+ "Scorpaenidae", "toxihaemia",
+ "Dictograph", "Quakerishly",
+ "deaf", "timbermonger",
+ "strammel", "Thraupidae",
+ "seditious", "plerome",
+ "Arneb", "eristically",
+ "serpentinic", "glaumrie",
+ "socioromantic", "apocalypst",
+ "tartrous", "Bassaris",
+ "angiolymphoma", "horsefly",
+ "kenno", "astronomize",
+ "euphemious", "arsenide",
+ "untongued", "parabolicness",
+ "uvanite", "helpless",
+ "gemmeous", "stormy",
+ "templar", "erythrodextrin",
+ "comism", "interfraternal",
+ "preparative", "parastas",
+ "frontoorbital", "Ophiosaurus",
+ "diopside", "serosanguineous",
+ "ununiformly", "karyological",
+ "collegian", "allotropic",
+ "depravity", "amylogenesis",
+ "reformatory", "epidymides",
+ "pleurotropous", "trillium",
+ "dastardliness", "coadvice",
+ "embryotic", "benthonic",
+ "pomiferous", "figureheadship",
+ "Megaluridae", "Harpa",
+ "frenal", "commotion",
+ "abthainry", "cobeliever",
+ "manilla", "spiciferous",
+ "nativeness", "obispo",
+ "monilioid", "biopsic",
+ "valvula", "enterostomy",
+ "planosubulate", "pterostigma",
+ "lifter", "triradiated",
+ "venialness", "tum",
+ "archistome", "tautness",
+ "unswanlike", "antivenin",
+ "Lentibulariaceae", "Triphora",
+ "angiopathy", "anta",
+ "Dawsonia", "becomma",
+ "Yannigan", "winterproof",
+ "antalgol", "harr",
+ "underogating", "ineunt",
+ "cornberry", "flippantness",
+ "scyphostoma", "approbation",
+ "Ghent", "Macraucheniidae",
+ "scabbiness", "unanatomized",
+ "photoelasticity", "eurythermal",
+ "enation", "prepavement",
+ "flushgate", "subsequentially",
+ "Edo", "antihero",
+ "Isokontae", "unforkedness",
+ "porriginous", "daytime",
+ "nonexecutive", "trisilicic",
+ "morphiomania", "paranephros",
+ "botchedly", "impugnation",
+ "Dodecatheon", "obolus",
+ "unburnt", "provedore",
+ "Aktistetae", "superindifference",
+ "Alethea", "Joachimite",
+ "cyanophilous", "chorograph",
+ "brooky", "figured",
+ "periclitation", "quintette",
+ "hondo", "ornithodelphous",
+ "unefficient", "pondside",
+ "bogydom", "laurinoxylon",
+ "Shiah", "unharmed",
+ "cartful", "noncrystallized",
+ "abusiveness", "cromlech",
+ "japanned", "rizzomed",
+ "underskin", "adscendent",
+ "allectory", "gelatinousness",
+ "volcano", "uncompromisingly",
+ "cubit", "idiotize",
+ "unfurbelowed", "undinted",
+ "magnetooptics", "Savitar",
+ "diwata", "ramosopalmate",
+ "Pishquow", "tomorn",
+ "apopenptic", "Haversian",
+ "Hysterocarpus", "ten",
+ "outhue", "Bertat",
+ "mechanist", "asparaginic",
+ "velaric", "tonsure",
+ "bubble", "Pyrales",
+ "regardful", "glyphography",
+ "calabazilla", "shellworker",
+ "stradametrical", "havoc",
+ "theologicopolitical", "sawdust",
+ "diatomaceous", "jajman",
+ "temporomastoid", "Serrifera",
+ "Ochnaceae", "aspersor",
+ "trailmaking", "Bishareen",
+ "digitule", "octogynous",
+ "epididymitis", "smokefarthings",
+ "bacillite", "overcrown",
+ "mangonism", "sirrah",
+ "undecorated", "psychofugal",
+ "bismuthiferous", "rechar",
+ "Lemuridae", "frameable",
+ "thiodiazole", "Scanic",
+ "sportswomanship", "interruptedness",
+ "admissory", "osteopaedion",
+ "tingly", "tomorrowness",
+ "ethnocracy", "trabecular",
+ "vitally", "fossilism",
+ "adz", "metopon",
+ "prefatorial", "expiscate",
+ "diathermacy", "chronist",
+ "nigh", "generalizable",
+ "hysterogen", "aurothiosulphuric",
+ "whitlowwort", "downthrust",
+ "Protestantize", "monander",
+ "Itea", "chronographic",
+ "silicize", "Dunlop",
+ "eer", "componental",
+ "spot", "pamphlet",
+ "antineuritic", "paradisean",
+ "interruptor", "debellator",
+ "overcultured", "Florissant",
+ "hyocholic", "pneumatotherapy",
+ "tailoress", "rave",
+ "unpeople", "Sebastian",
+ "thermanesthesia", "Coniferae",
+ "swacking", "posterishness",
+ "ethmopalatal", "whittle",
+ "analgize", "scabbardless",
+ "naught", "symbiogenetically",
+ "trip", "parodist",
+ "columniform", "trunnel",
+ "yawler", "goodwill",
+ "pseudohalogen", "swangy",
+ "cervisial", "mediateness",
+ "genii", "imprescribable",
+ "pony", "consumptional",
+ "carposporangial", "poleax",
+ "bestill", "subfebrile",
+ "sapphiric", "arrowworm",
+ "qualminess", "ultraobscure",
+ "thorite", "Fouquieria",
+ "Bermudian", "prescriber",
+ "elemicin", "warlike",
+ "semiangle", "rotular",
+ "misthread", "returnability",
+ "seraphism", "precostal",
+ "quarried", "Babylonism",
+ "sangaree", "seelful",
+ "placatory", "pachydermous",
+ "bozal", "galbulus",
+ "spermaphyte", "cumbrousness",
+ "pope", "signifier",
+ "Endomycetaceae", "shallowish",
+ "sequacity", "periarthritis",
+ "bathysphere", "pentosuria",
+ "Dadaism", "spookdom",
+ "Consolamentum", "afterpressure",
+ "mutter", "louse",
+ "ovoviviparous", "corbel",
+ "metastoma", "biventer",
+ "Hydrangea", "hogmace",
+ "seizing", "nonsuppressed",
+ "oratorize", "uncarefully",
+ "benzothiofuran", "penult",
+ "balanocele", "macropterous",
+ "dishpan", "marten",
+ "absvolt", "jirble",
+ "parmelioid", "airfreighter",
+ "acocotl", "archesporial",
+ "hypoplastral", "preoral",
+ "quailberry", "cinque",
+ "terrestrially", "stroking",
+ "limpet", "moodishness",
+ "canicule", "archididascalian",
+ "pompiloid", "overstaid",
+ "introducer", "Italical",
+ "Christianopaganism", "prescriptible",
+ "subofficer", "danseuse",
+ "cloy", "saguran",
+ "frictionlessly", "deindividualization",
+ "Bulanda", "ventricous",
+ "subfoliar", "basto",
+ "scapuloradial", "suspend",
+ "stiffish", "Sphenodontidae",
+ "eternal", "verbid",
+ "mammonish", "upcushion",
+ "barkometer", "concretion",
+ "preagitate", "incomprehensible",
+ "tristich", "visceral",
+ "hemimelus", "patroller",
+ "stentorophonic", "pinulus",
+ "kerykeion", "brutism",
+ "monstership", "merciful",
+ "overinstruct", "defensibly",
+ "bettermost", "splenauxe",
+ "Mormyrus", "unreprimanded",
+ "taver", "ell",
+ "proacquittal", "infestation",
+ "overwoven", "Lincolnlike",
+ "chacona", "Tamil",
+ "classificational", "lebensraum",
+ "reeveland", "intuition",
+ "Whilkut", "focaloid",
+ "Eleusinian", "micromembrane",
+ "byroad", "nonrepetition",
+ "bacterioblast", "brag",
+ "ribaldrous", "phytoma",
+ "counteralliance", "pelvimetry",
+ "pelf", "relaster",
+ "thermoresistant", "aneurism",
+ "molossic", "euphonym",
+ "upswell", "ladhood",
+ "phallaceous", "inertly",
+ "gunshop", "stereotypography",
+ "laryngic", "refasten",
+ "twinling", "oflete",
+ "hepatorrhaphy", "electrotechnics",
+ "cockal", "guitarist",
+ "topsail", "Cimmerianism",
+ "larklike", "Llandovery",
+ "pyrocatechol", "immatchable",
+ "chooser", "metrocratic",
+ "craglike", "quadrennial",
+ "nonpoisonous", "undercolored",
+ "knob", "ultratense",
+ "balladmonger", "slait",
+ "sialadenitis", "bucketer",
+ "magnificently", "unstipulated",
+ "unscourged", "unsupercilious",
+ "packsack", "pansophism",
+ "soorkee", "percent",
+ "subirrigate", "champer",
+ "metapolitics", "spherulitic",
+ "involatile", "metaphonical",
+ "stachyuraceous", "speckedness",
+ "bespin", "proboscidiform",
+ "gul", "squit",
+ "yeelaman", "peristeropode",
+ "opacousness", "shibuichi",
+ "retinize", "yote",
+ "misexposition", "devilwise",
+ "pumpkinification", "vinny",
+ "bonze", "glossing",
+ "decardinalize", "transcortical",
+ "serphoid", "deepmost",
+ "guanajuatite", "wemless",
+ "arval", "lammy",
+ "Effie", "Saponaria",
+ "tetrahedral", "prolificy",
+ "excerpt", "dunkadoo",
+ "Spencerism", "insatiately",
+ "Gilaki", "oratorship",
+ "arduousness", "unbashfulness",
+ "Pithecolobium", "unisexuality",
+ "veterinarian", "detractive",
+ "liquidity", "acidophile",
+ "proauction", "sural",
+ "totaquina", "Vichyite",
+ "uninhabitedness", "allegedly",
+ "Gothish", "manny",
+ "Inger", "flutist",
+ "ticktick", "Ludgatian",
+ "homotransplant", "orthopedical",
+ "diminutively", "monogoneutic",
+ "Kenipsim", "sarcologist",
+ "drome", "stronghearted",
+ "Fameuse", "Swaziland",
+ "alen", "chilblain",
+ "beatable", "agglomeratic",
+ "constitutor", "tendomucoid",
+ "porencephalous", "arteriasis",
+ "boser", "tantivy",
+ "rede", "lineamental",
+ "uncontradictableness", "homeotypical",
+ "masa", "folious",
+ "dosseret", "neurodegenerative",
+ "subtransverse", "Chiasmodontidae",
+ "palaeotheriodont", "unstressedly",
+ "chalcites", "piquantness",
+ "lampyrine", "Aplacentalia",
+ "projecting", "elastivity",
+ "isopelletierin", "bladderwort",
+ "strander", "almud",
+ "iniquitously", "theologal",
+ "bugre", "chargeably",
+ "imperceptivity", "meriquinoidal",
+ "mesophyte", "divinator",
+ "perfunctory", "counterappellant",
+ "synovial", "charioteer",
+ "crystallographical", "comprovincial",
+ "infrastapedial", "pleasurehood",
+ "inventurous", "ultrasystematic",
+ "subangulated", "supraoesophageal",
+ "Vaishnavism", "transude",
+ "chrysochrous", "ungrave",
+ "reconciliable", "uninterpleaded",
+ "erlking", "wherefrom",
+ "aprosopia", "antiadiaphorist",
+ "metoxazine", "incalculable",
+ "umbellic", "predebit",
+ "foursquare", "unimmortal",
+ "nonmanufacture", "slangy",
+ "predisputant", "familist",
+ "preaffiliate", "friarhood",
+ "corelysis", "zoonitic",
+ "halloo", "paunchy",
+ "neuromimesis", "aconitine",
+ "hackneyed", "unfeeble",
+ "cubby", "autoschediastical",
+ "naprapath", "lyrebird",
+ "inexistency", "leucophoenicite",
+ "ferrogoslarite", "reperuse",
+ "uncombable", "tambo",
+ "propodiale", "diplomatize",
+ "Russifier", "clanned",
+ "corona", "michigan",
+ "nonutilitarian", "transcorporeal",
+ "bought", "Cercosporella",
+ "stapedius", "glandularly",
+ "pictorially", "weism",
+ "disilane", "rainproof",
+ "Caphtor", "scrubbed",
+ "oinomancy", "pseudoxanthine",
+ "nonlustrous", "redesertion",
+ "Oryzorictinae", "gala",
+ "Mycogone", "reappreciate",
+ "cyanoguanidine", "seeingness",
+ "breadwinner", "noreast",
+ "furacious", "epauliere",
+ "omniscribent", "Passiflorales",
+ "uninductive", "inductivity",
+ "Orbitolina", "Semecarpus",
+ "migrainoid", "steprelationship",
+ "phlogisticate", "mesymnion",
+ "sloped", "edificator",
+ "beneficent", "culm",
+ "paleornithology", "unurban",
+ "throbless", "amplexifoliate",
+ "sesquiquintile", "sapience",
+ "astucious", "dithery",
+ "boor", "ambitus",
+ "scotching", "uloid",
+ "uncompromisingness", "hoove",
+ "waird", "marshiness",
+ "Jerusalem", "mericarp",
+ "unevoked", "benzoperoxide",
+ "outguess", "pyxie",
+ "hymnic", "euphemize",
+ "mendacity", "erythremia",
+ "rosaniline", "unchatteled",
+ "lienteria", "Bushongo",
+ "dialoguer", "unrepealably",
+ "rivethead", "antideflation",
+ "vinegarish", "manganosiderite",
+ "doubtingness", "ovopyriform",
+ "Cephalodiscus", "Muscicapa",
+ "Animalivora", "angina",
+ "planispheric", "ipomoein",
+ "cuproiodargyrite", "sandbox",
+ "scrat", "Munnopsidae",
+ "shola", "pentafid",
+ "overstudiousness", "times",
+ "nonprofession", "appetible",
+ "valvulotomy", "goladar",
+ "uniarticular", "oxyterpene",
+ "unlapsing", "omega",
+ "trophonema", "seminonflammable",
+ "circumzenithal", "starer",
+ "depthwise", "liberatress",
+ "unleavened", "unrevolting",
+ "groundneedle", "topline",
+ "wandoo", "umangite",
+ "ordinant", "unachievable",
+ "oversand", "snare",
+ "avengeful", "unexplicit",
+ "mustafina", "sonable",
+ "rehabilitative", "eulogization",
+ "papery", "technopsychology",
+ "impressor", "cresylite",
+ "entame", "transudatory",
+ "scotale", "pachydermatoid",
+ "imaginary", "yeat",
+ "slipped", "stewardship",
+ "adatom", "cockstone",
+ "skyshine", "heavenful",
+ "comparability", "exprobratory",
+ "dermorhynchous", "parquet",
+ "cretaceous", "vesperal",
+ "raphis", "undangered",
+ "Glecoma", "engrain",
+ "counteractively", "Zuludom",
+ "orchiocatabasis", "Auriculariales",
+ "warriorwise", "extraorganismal",
+ "overbuilt", "alveolite",
+ "tetchy", "terrificness",
+ "widdle", "unpremonished",
+ "rebilling", "sequestrum",
+ "equiconvex", "heliocentricism",
+ "catabaptist", "okonite",
+ "propheticism", "helminthagogic",
+ "calycular", "giantly",
+ "wingable", "golem",
+ "unprovided", "commandingness",
+ "greave", "haply",
+ "doina", "depressingly",
+ "subdentate", "impairment",
+ "decidable", "neurotrophic",
+ "unpredict", "bicorporeal",
+ "pendulant", "flatman",
+ "intrabred", "toplike",
+ "Prosobranchiata", "farrantly",
+ "toxoplasmosis", "gorilloid",
+ "dipsomaniacal", "aquiline",
+ "atlantite", "ascitic",
+ "perculsive", "prospectiveness",
+ "saponaceous", "centrifugalization",
+ "dinical", "infravaginal",
+ "beadroll", "affaite",
+ "Helvidian", "tickleproof",
+ "abstractionism", "enhedge",
+ "outwealth", "overcontribute",
+ "coldfinch", "gymnastic",
+ "Pincian", "Munychian",
+ "codisjunct", "quad",
+ "coracomandibular", "phoenicochroite",
+ "amender", "selectivity",
+ "putative", "semantician",
+ "lophotrichic", "Spatangoidea",
+ "saccharogenic", "inferent",
+ "Triconodonta", "arrendation",
+ "sheepskin", "taurocolla",
+ "bunghole", "Machiavel",
+ "triakistetrahedral", "dehairer",
+ "prezygapophysial", "cylindric",
+ "pneumonalgia", "sleigher",
+ "emir", "Socraticism",
+ "licitness", "massedly",
+ "instructiveness", "sturdied",
+ "redecrease", "starosta",
+ "evictor", "orgiastic",
+ "squdge", "meloplasty",
+ "Tsonecan", "repealableness",
+ "swoony", "myesthesia",
+ "molecule", "autobiographist",
+ "reciprocation", "refective",
+ "unobservantness", "tricae",
+ "ungouged", "floatability",
+ "Mesua", "fetlocked",
+ "chordacentrum", "sedentariness",
+ "various", "laubanite",
+ "nectopod", "zenick",
+ "sequentially", "analgic",
+ "biodynamics", "posttraumatic",
+ "nummi", "pyroacetic",
+ "bot", "redescend",
+ "dispermy", "undiffusive",
+ "circular", "trillion",
+ "Uraniidae", "ploration",
+ "discipular", "potentness",
+ "sud", "Hu",
+ "Eryon", "plugger",
+ "subdrainage", "jharal",
+ "abscission", "supermarket",
+ "countergabion", "glacierist",
+ "lithotresis", "minniebush",
+ "zanyism", "eucalypteol",
+ "sterilely", "unrealize",
+ "unpatched", "hypochondriacism",
+ "critically", "cheesecutter",
+ };
+ }
+
+ /**
+ * This is the main routine for launching a distributed random write job.
+ * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
+ * The reduce doesn't do anything.
+ *
+ * @throws IOException
+ */
+ public int run(String[] args) throws Exception {
+ if (args.length == 0) {
+ return printUsage();
+ }
+ Job job = createJob(getConf());
+ FileOutputFormat.setOutputPath(job, new Path(args[0]));
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ int ret = job.waitForCompletion(true) ? 0 : 1;
+ Date endTime = new Date();
+ System.out.println("Job ended: " + endTime);
+ System.out.println("The job took " +
+ (endTime.getTime() - startTime.getTime()) /1000 +
+ " seconds.");
+
+ return ret;
+ }
+
+ static int printUsage() {
+ System.out.println("randomtextwriter " +
+ "[-outFormat <output format class>] " +
+ "<output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return 2;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new RandomTextWriterJob(),
+ args);
+ System.exit(res);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java?rev=1126981&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/SleepJob.java Tue May 24 11:19:25 2011
@@ -0,0 +1,273 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period
+ * of time in mapper and reducer. Generates fake input for map / reduce
+ * jobs. Note that generated number of input pairs is in the order
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class SleepJob extends Configured implements Tool {
+ public static String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
+ public static String REDUCE_SLEEP_COUNT =
+ "mapreduce.sleepjob.reduce.sleep.count";
+ public static String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
+ public static String REDUCE_SLEEP_TIME =
+ "mapreduce.sleepjob.reduce.sleep.time";
+
+ public static class SleepJobPartitioner extends
+ Partitioner<IntWritable, NullWritable> {
+ public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+ return k.get() % numPartitions;
+ }
+ }
+
+ public static class EmptySplit extends InputSplit implements Writable {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() { return 0L; }
+ public String[] getLocations() { return new String[0]; }
+ }
+
+ public static class SleepInputFormat
+ extends InputFormat<IntWritable,IntWritable> {
+
+ public List<InputSplit> getSplits(JobContext jobContext) {
+ List<InputSplit> ret = new ArrayList<InputSplit>();
+ int numSplits = jobContext.getConfiguration().
+ getInt(MRJobConfig.NUM_MAPS, 1);
+ for (int i = 0; i < numSplits; ++i) {
+ ret.add(new EmptySplit());
+ }
+ return ret;
+ }
+
+ public RecordReader<IntWritable,IntWritable> createRecordReader(
+ InputSplit ignored, TaskAttemptContext taskContext)
+ throws IOException {
+ Configuration conf = taskContext.getConfiguration();
+ final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+ if (count < 0) throw new IOException("Invalid map count: " + count);
+ final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
+ if (redcount < 0)
+ throw new IOException("Invalid reduce count: " + redcount);
+ final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+
+ return new RecordReader<IntWritable,IntWritable>() {
+ private int records = 0;
+ private int emitCount = 0;
+ private IntWritable key = null;
+ private IntWritable value = null;
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ }
+
+ public boolean nextKeyValue()
+ throws IOException {
+ key = new IntWritable();
+ key.set(emitCount);
+ int emit = emitPerMapTask / count;
+ if ((emitPerMapTask) % count > records) {
+ ++emit;
+ }
+ emitCount += emit;
+ value = new IntWritable();
+ value.set(emit);
+ return records++ < count;
+ }
+ public IntWritable getCurrentKey() { return key; }
+ public IntWritable getCurrentValue() { return value; }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException {
+ return records / ((float)count);
+ }
+ };
+ }
+ }
+
+ public static class SleepMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ private long mapSleepDuration = 100;
+ private int mapSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.mapSleepCount =
+ conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+ this.mapSleepDuration =
+ conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+ }
+
+ public void map(IntWritable key, IntWritable value, Context context
+ ) throws IOException, InterruptedException {
+ //it is expected that every map processes mapSleepCount number of records.
+ try {
+ context.setStatus("Sleeping... (" +
+ (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+ Thread.sleep(mapSleepDuration);
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ ++count;
+ // output reduceSleepCount * numReduce number of random values, so that
+ // each reducer will get reduceSleepCount number of keys.
+ int k = key.get();
+ for (int i = 0; i < value.get(); ++i) {
+ context.write(new IntWritable(k + i), NullWritable.get());
+ }
+ }
+ }
+
+ public static class SleepReducer
+ extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+ private long reduceSleepDuration = 100;
+ private int reduceSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.reduceSleepCount =
+ conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+ this.reduceSleepDuration =
+ conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+ }
+
+ public void reduce(IntWritable key, Iterable<NullWritable> values,
+ Context context)
+ throws IOException {
+ try {
+ context.setStatus("Sleeping... (" +
+ (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+ Thread.sleep(reduceSleepDuration);
+
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ count++;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+ System.exit(res);
+ }
+
+ public Job createJob(int numMapper, int numReducer,
+ long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount)
+ throws IOException {
+ Configuration conf = getConf();
+ conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+ conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+ conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+ conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+ conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+ Job job = Job.getInstance(conf, "sleep");
+ job.setNumReduceTasks(numReducer);
+ job.setJarByClass(SleepJob.class);
+ job.setNumReduceTasks(numReducer);
+ job.setMapperClass(SleepMapper.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setReducerClass(SleepReducer.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setInputFormatClass(SleepInputFormat.class);
+ job.setPartitionerClass(SleepJobPartitioner.class);
+ job.setSpeculativeExecution(false);
+ job.setJobName("Sleep job");
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ return job;
+ }
+
+ public int run(String[] args) throws Exception {
+
+ if(args.length < 1) {
+ System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
+ " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+ " [-recordt recordSleepTime (msec)]");
+ ToolRunner.printGenericCommandUsage(System.err);
+ return 2;
+ }
+
+ int numMapper = 1, numReducer = 1;
+ long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+ int mapSleepCount = 1, reduceSleepCount = 1;
+
+ for(int i=0; i < args.length; i++ ) {
+ if(args[i].equals("-m")) {
+ numMapper = Integer.parseInt(args[++i]);
+ }
+ else if(args[i].equals("-r")) {
+ numReducer = Integer.parseInt(args[++i]);
+ }
+ else if(args[i].equals("-mt")) {
+ mapSleepTime = Long.parseLong(args[++i]);
+ }
+ else if(args[i].equals("-rt")) {
+ reduceSleepTime = Long.parseLong(args[++i]);
+ }
+ else if (args[i].equals("-recordt")) {
+ recSleepTime = Long.parseLong(args[++i]);
+ }
+ }
+
+ // sleep for *SleepTime duration in Task by recSleepTime per record
+ mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+ reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+ Job job = createJob(numMapper, numReducer, mapSleepTime,
+ mapSleepCount, reduceSleepTime, reduceSleepCount);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Tue May 24 11:19:25 2011
@@ -19,8 +19,13 @@
package org.apache.hadoop.mapreduce.v2;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
import junit.framework.Assert;
@@ -32,20 +37,29 @@ import org.apache.hadoop.RandomTextWrite
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -64,8 +78,22 @@ public class TestMRJobs {
protected static MiniMRYarnCluster mrCluster;
+ private static Configuration conf = new Configuration();
+ private static FileSystem localFs;
+ static {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ private static Path TEST_ROOT_DIR = new Path("target",
+ TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs);
+ static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
@BeforeClass
- public static void setup() {
+ public static void setup() throws IOException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -82,6 +110,11 @@ public class TestMRJobs {
// TestMRJobs is for testing non-uberized operation only; see TestUberAM
// for corresponding uberized tests.
mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+ // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+ // workaround the absent public discache.
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+ localFs.setPermission(APP_JAR, new FsPermission("700"));
}
@AfterClass
@@ -112,9 +145,9 @@ public class TestMRJobs {
// job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1);
- // TODO: We should not be setting MRAppJar as job.jar. It should be
- // uploaded separately by YarnRunner.
- job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.setJarByClass(SleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
@@ -141,9 +174,9 @@ public class TestMRJobs {
Path outputDir = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"random-output");
FileOutputFormat.setOutputPath(job, outputDir);
- // TODO: We should not be setting MRAppJar as job.jar. It should be
- // uploaded separately by YarnRunner.
- job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.setJarByClass(RandomTextWriterJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
job.waitForCompletion(true);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
// Make sure there are three files in the output-dir
@@ -219,9 +252,7 @@ public class TestMRJobs {
FileOutputFormat.setOutputPath(job,
new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
"failmapper-output"));
- // TODO: We should not be setting MRAppJar as job.jar. It should be
- // uploaded separately by YarnRunner.
- job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
return job;
@@ -264,9 +295,7 @@ public class TestMRJobs {
Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
// //Job with reduces
// Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
- // TODO: We should not be setting MRAppJar as job.jar. It should be
- // uploaded separately by YarnRunner.
- job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
return null;
@@ -276,4 +305,105 @@ public class TestMRJobs {
// TODO later: add explicit "isUber()" checks of some sort
}
+ public static class DistributedCacheChecker extends
+ Mapper<LongWritable, Text, NullWritable, NullWritable> {
+
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ Path[] files = context.getLocalCacheFiles();
+ Path[] archives = context.getLocalCacheArchives();
+ FileSystem fs = LocalFileSystem.get(conf);
+
+ // Check that 3(2+ appjar) files and 2 archives are present
+ Assert.assertEquals(3, files.length);
+ Assert.assertEquals(2, archives.length);
+
+ // Check lengths of the files
+ Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen());
+ Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
+
+ // Check extraction of the archive
+ Assert.assertTrue(fs.exists(new Path(archives[0],
+ "distributed.jar.inside3")));
+ Assert.assertTrue(fs.exists(new Path(archives[1],
+ "distributed.jar.inside4")));
+
+ // Check the class loaders
+ LOG.info("Java Classpath: " + System.getProperty("java.class.path"));
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ // Both the file and the archive should have been added to classpath, so
+ // both should be reachable via the class loader.
+ Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
+ Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
+ Assert.assertNull(cl.getResource("distributed.jar.inside4"));
+
+ // Check that the symlink for the renaming was created in the cwd;
+ File symlinkFile = new File("distributed.first.symlink");
+ Assert.assertTrue(symlinkFile.exists());
+ Assert.assertEquals(1, symlinkFile.length());
+ }
+ }
+
+ @Test
+ public void testDistributedCache() throws Exception {
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ // Create a temporary file of length 1.
+ Path first = createTempFile("distributed.first", "x");
+ // Create two jars with a single file inside them.
+ Path second =
+ makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
+ Path third =
+ makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
+ Path fourth =
+ makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
+
+ Job job = Job.getInstance(mrCluster.getConfig());
+ job.setJarByClass(DistributedCacheChecker.class);
+ job.setMapperClass(DistributedCacheChecker.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ FileInputFormat.setInputPaths(job, first);
+ // Creates the Job Configuration
+ job.addCacheFile(
+ new URI(first.toUri().toString() + "#distributed.first.symlink"));
+ job.addFileToClassPath(second);
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.addArchiveToClassPath(third);
+ job.addCacheArchive(fourth.toUri());
+ job.createSymlink();
+ job.setMaxMapAttempts(1); // speed up failures
+
+ job.submit();
+ Assert.assertTrue(job.waitForCompletion(false));
+ }
+
+ private Path createTempFile(String filename, String contents)
+ throws IOException {
+ Path path = new Path(TEST_ROOT_DIR, filename);
+ FSDataOutputStream os = localFs.create(path);
+ os.writeBytes(contents);
+ os.close();
+ localFs.setPermission(path, new FsPermission("700"));
+ return path;
+ }
+
+ private Path makeJar(Path p, int index) throws FileNotFoundException,
+ IOException {
+ FileOutputStream fos =
+ new FileOutputStream(new File(p.toUri().getPath()));
+ JarOutputStream jos = new JarOutputStream(fos);
+ ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
+ jos.putNextEntry(ze);
+ jos.write(("inside the jar!" + index).getBytes());
+ jos.closeEntry();
+ jos.close();
+ localFs.setPermission(p, new FsPermission("700"));
+ return p;
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Tue May 24 11:19:25 2011
@@ -26,8 +26,12 @@ import junit.framework.Assert;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.FailingMapper;
import org.apache.hadoop.SleepJob;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -44,6 +48,20 @@ public class TestMRJobsWithHistoryServic
private static MiniMRYarnCluster mrCluster;
+ private static Configuration conf = new Configuration();
+ private static FileSystem localFs;
+ static {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ private static Path TEST_ROOT_DIR = new Path("target",
+ TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs);
+ static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+
@Before
public void setup() throws InterruptedException, IOException {
@@ -58,6 +76,11 @@ public class TestMRJobsWithHistoryServic
mrCluster.init(new Configuration());
mrCluster.start();
}
+
+ // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
+ // workaround the absent public discache.
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+ localFs.setPermission(APP_JAR, new FsPermission("700"));
}
@After
@@ -86,7 +109,8 @@ public class TestMRJobsWithHistoryServic
sleepJob.setConf(mrCluster.getConfig());
// Job with 3 maps and 2 reduces
Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
- job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.setJarByClass(SleepJob.class);
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
Counters counterMR = job.getCounters();
ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue May 24 11:19:25 2011
@@ -33,13 +33,14 @@ import org.apache.hadoop.mapreduce.TaskC
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.BeforeClass;
+import org.junit.Test;
public class TestUberAM extends TestMRJobs {
private static final Log LOG = LogFactory.getLog(TestUberAM.class);
@BeforeClass
- public static void setup() {
+ public static void setup() throws IOException {
TestMRJobs.setup();
if (mrCluster != null) {
mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
@@ -109,4 +110,10 @@ public class TestUberAM extends TestMRJo
super.testSleepJobWithSecurityOn();
}
+ // Add a test for distcache when uber mode is enabled. TODO
+ @Override
+ @Test
+ public void testDistributedCache() throws Exception {
+ //
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java Tue May 24 11:19:25 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.hadoop.Clock;
+
/**
* A clock class - can be mocked out for testing.
*/
@@ -33,7 +35,7 @@ class SimulatorClock extends Clock {
}
@Override
- long getTime() {
+ public long getTime() {
return currentTime;
}
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java Tue May 24 11:19:25 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobStatu
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.Clock;
import org.apache.hadoop.mapred.SimulatorJobInProgress;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.security.Credentials;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1126981&r1=1126980&r2=1126981&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue May 24 11:19:25 2011
@@ -235,9 +235,7 @@ public class ContainerManagerImpl extend
public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
- if (LOG.isDebugEnabled()) {
- LOG.debug(" container is " + request);
- }
+ LOG.info(" container is " + request);
// parse credentials
ByteBuffer tokens = launchContext.getContainerTokens();