You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:15 UTC
svn commit: r1513258 [3/4] - in
/hadoop/common/branches/YARN-321/hadoop-mapreduce-project: ./ bin/ conf/
dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Mon Aug 12 21:25:49 2013
@@ -60,7 +60,7 @@ class Fetcher<K,V> extends Thread {
/* Default read timeout (in milliseconds) */
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
- private final Reporter reporter;
+ protected final Reporter reporter;
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
@@ -71,13 +71,13 @@ class Fetcher<K,V> extends Thread {
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
- private final MergeManager<K,V> merger;
- private final ShuffleSchedulerImpl<K,V> scheduler;
- private final ShuffleClientMetrics metrics;
- private final ExceptionReporter exceptionReporter;
- private final int id;
+ protected final MergeManager<K,V> merger;
+ protected final ShuffleSchedulerImpl<K,V> scheduler;
+ protected final ShuffleClientMetrics metrics;
+ protected final ExceptionReporter exceptionReporter;
+ protected final int id;
private static int nextId = 0;
- private final int reduce;
+ protected final int reduce;
private final int connectionTimeout;
private final int readTimeout;
@@ -407,7 +407,14 @@ class Fetcher<K,V> extends Thread {
}
// Get the location for the map output - either in-memory or on-disk
- mapOutput = merger.reserve(mapId, decompressedLength, id);
+ try {
+ mapOutput = merger.reserve(mapId, decompressedLength, id);
+ } catch (IOException ioe) {
+ // kill this reduce attempt
+ ioErrs.increment(1);
+ scheduler.reportLocalError(ioe);
+ return EMPTY_ATTEMPT_ID_ARRAY;
+ }
// Check if we can shuffle *now* ...
if (mapOutput == null) {
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java Mon Aug 12 21:25:49 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -39,11 +40,11 @@ public class InMemoryReader<K, V> extend
DataInputBuffer memDataIn = new DataInputBuffer();
private int start;
private int length;
-
+
public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
- byte[] data, int start, int length)
+ byte[] data, int start, int length, Configuration conf)
throws IOException {
- super(null, null, length - start, null, null);
+ super(conf, null, length - start, null, null);
this.merger = merger;
this.taskAttemptId = taskAttemptId;
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Aug 12 21:25:49 2013
@@ -613,7 +613,7 @@ public class MergeManagerImpl<K, V> impl
fullSize -= size;
Reader<K,V> reader = new InMemoryReader<K,V>(MergeManagerImpl.this,
mo.getMapId(),
- data, 0, (int)size);
+ data, 0, (int)size, jobConf);
inMemorySegments.add(new Segment<K,V>(reader, true,
(mo.isPrimaryMapOutput() ?
mergedMapOutputsCounter : null)));
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Mon Aug 12 21:25:49 2013
@@ -18,10 +18,12 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
@@ -56,6 +58,7 @@ public class Shuffle<K, V> implements Sh
private Progress copyPhase;
private TaskStatus taskStatus;
private Task reduceTask; //Used for status updates
+ private Map<TaskAttemptID, MapOutputFile> localMapFiles;
@Override
public void init(ShuffleConsumerPlugin.Context context) {
@@ -69,6 +72,7 @@ public class Shuffle<K, V> implements Sh
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
+ this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
@@ -103,13 +107,22 @@ public class Shuffle<K, V> implements Sh
eventFetcher.start();
// Start the map-output fetcher threads
- final int numFetchers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+ boolean isLocal = localMapFiles != null;
+ final int numFetchers = isLocal ? 1 :
+ jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
- for (int i=0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
- reporter, metrics, this,
- reduceTask.getShuffleSecret());
- fetchers[i].start();
+ if (isLocal) {
+ fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
+ merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
+ localMapFiles);
+ fetchers[0].start();
+ } else {
+ for (int i=0; i < numFetchers; ++i) {
+ fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
+ reporter, metrics, this,
+ reduceTask.getShuffleSecret());
+ fetchers[i].start();
+ }
}
// Wait for shuffle to complete successfully
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java Mon Aug 12 21:25:49 2013
@@ -19,7 +19,9 @@ package org.apache.hadoop.mapreduce.task
import java.io.IOException;
+import java.net.InetAddress;
import java.net.URI;
+import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
@@ -252,6 +254,16 @@ public class ShuffleSchedulerImpl<K,V> i
failedShuffleCounter.increment(1);
}
+
+ public void reportLocalError(IOException ioe) {
+ try {
+ LOG.error("Shuffle failed : local error on this node: "
+ + InetAddress.getLocalHost());
+ } catch (UnknownHostException e) {
+ LOG.error("Shuffle failed : local error on this node");
+ }
+ reporter.reportException(ioe);
+ }
// Notify the JobTracker
// after every read error, if 'reportReadErrorImmediately' is true or
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Aug 12 21:25:49 2013
@@ -1566,4 +1566,16 @@
<description>Whether to use fixed ports with the minicluster</description>
</property>
+<property>
+ <name>mapreduce.jobhistory.admin.address</name>
+ <value>0.0.0.0:10033</value>
+ <description>The address of the History server admin interface.</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.admin.acl</name>
+ <value>*</value>
+ <description>ACL of who can be admin of the History server.</description>
+</property>
+
</configuration>
Propchange: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1503799-1513205
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1380921,1507259
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Mon Aug 12 21:25:49 2013
@@ -155,7 +155,7 @@ public class TestShufflePlugin<K, V> {
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
- mockTask, mockMapOutputFile);
+ mockTask, mockMapOutputFile, null);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Mon Aug 12 21:25:49 2013
@@ -24,11 +24,14 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
@@ -77,6 +80,23 @@ public class TestFileInputFormat {
.toString());
}
+ @Test
+ public void testListLocatedStatus() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setBoolean("fs.test.impl.disable.cache", false);
+ conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
+ MockFileSystem mockFs =
+ (MockFileSystem) new Path("test:///").getFileSystem(conf);
+ Assert.assertEquals("listLocatedStatus already called",
+ 0, mockFs.numListLocatedStatusCalls);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+ List<InputSplit> splits = fileInputFormat.getSplits(job);
+ Assert.assertEquals("Input splits are not correct", 2, splits.size());
+ Assert.assertEquals("listLocatedStatuss calls",
+ 1, mockFs.numListLocatedStatusCalls);
+ }
+
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.test.impl.disable.cache", "true");
@@ -86,13 +106,14 @@ public class TestFileInputFormat {
}
static class MockFileSystem extends RawLocalFileSystem {
+ int numListLocatedStatusCalls = 0;
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
if (f.toString().equals("test:/a1")) {
return new FileStatus[] {
- new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
+ new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
} else if (f.toString().equals("test:/a1/a2")) {
return new FileStatus[] {
@@ -116,5 +137,20 @@ public class TestFileInputFormat {
throws FileNotFoundException, IOException {
return this.listStatus(f);
}
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+ throws IOException {
+ return new BlockLocation[] {
+ new BlockLocation(new String[] { "localhost:50010" },
+ new String[] { "localhost" }, 0, len) };
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+ PathFilter filter) throws FileNotFoundException, IOException {
+ ++numListLocatedStatusCalls;
+ return super.listLocatedStatus(f, filter);
+ }
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java Mon Aug 12 21:25:49 2013
@@ -170,6 +170,7 @@ public class TestTokenCache {
assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
}
+ @SuppressWarnings("deprecation")
@Test
public void testGetTokensForNamenodes() throws IOException,
URISyntaxException {
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Mon Aug 12 21:25:49 2013
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -114,6 +115,36 @@ public class TestFetcher {
LOG.info("<<<< " + name.getMethodName());
}
+ @Test
+ public void testReduceOutOfDiskSpace() throws Throwable {
+ LOG.info("testReduceOutOfDiskSpace");
+
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+ r, metrics, except, key, connection);
+
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ header.write(new DataOutputStream(bout));
+
+ ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+
+ when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+ .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+ .thenReturn(replyHash);
+ when(connection.getInputStream()).thenReturn(in);
+
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+ .thenThrow(new DiskErrorException("No disk space available"));
+
+ underTest.copyFromHost(host);
+ verify(ss).reportLocalError(any(IOException.class));
+ }
+
@Test(timeout=30000)
public void testCopyFromHostConnectionTimeout() throws Exception {
when(connection.getInputStream()).thenThrow(
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs-plugins/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
<parent>
<artifactId>hadoop-mapreduce-client</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs-plugins</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<name>hadoop-mapreduce-client-hs-plugins</name>
<properties>
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
<parent>
<artifactId>hadoop-mapreduce-client</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<name>hadoop-mapreduce-client-hs</name>
<properties>
@@ -67,6 +67,34 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <imports>
+ <param>
+ ${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto
+ </param>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>HSAdminRefreshProtocol.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java Mon Aug 12 21:25:49 2013
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.v2.jo
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Manages an in memory cache of parsed Job History files.
*/
@@ -58,12 +60,16 @@ public class CachedHistoryStorage extend
this.hsManager = hsManager;
}
- @SuppressWarnings("serial")
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
LOG.info("CachedHistoryStorage Init");
+ createLoadedJobCache(conf);
+ }
+
+ @SuppressWarnings("serial")
+ private void createLoadedJobCache(Configuration conf) {
loadedJobCacheSize = conf.getInt(
JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE);
@@ -76,11 +82,25 @@ public class CachedHistoryStorage extend
}
});
}
-
+
+ public void refreshLoadedJobCache() {
+ if (getServiceState() == STATE.STARTED) {
+ setConfig(createConf());
+ createLoadedJobCache(getConfig());
+ } else {
+ LOG.warn("Failed to execute refreshLoadedJobCache: CachedHistoryStorage is not started");
+ }
+ }
+
+ @VisibleForTesting
+ Configuration createConf() {
+ return new Configuration();
+ }
+
public CachedHistoryStorage() {
super(CachedHistoryStorage.class.getName());
}
-
+
private Job loadJob(HistoryFileInfo fileInfo) {
try {
Job job = fileInfo.loadJob();
@@ -98,6 +118,11 @@ public class CachedHistoryStorage extend
}
}
+ @VisibleForTesting
+ Map<JobId, Job> getLoadedJobCache() {
+ return loadedJobCache;
+ }
+
@Override
public Job getFullJob(JobId jobId) {
if (LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Mon Aug 12 21:25:49 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
@@ -61,7 +62,9 @@ import org.apache.hadoop.mapreduce.v2.jo
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownThreadsHelper;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
@@ -416,7 +419,7 @@ public class HistoryFileManager extends
return historyFile;
}
- private synchronized void delete() throws IOException {
+ protected synchronized void delete() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("deleting " + historyFile + " and " + confFile);
}
@@ -471,8 +474,8 @@ public class HistoryFileManager extends
private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
private FileContext intermediateDoneDirFc; // Intermediate Done Dir
// FileContext
-
- private ThreadPoolExecutor moveToDoneExecutor = null;
+ @VisibleForTesting
+ protected ThreadPoolExecutor moveToDoneExecutor = null;
private long maxHistoryAge = 0;
public HistoryFileManager() {
@@ -524,10 +527,7 @@ public class HistoryFileManager extends
maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
- jobListCache = new JobListCache(conf.getInt(
- JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
- JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE),
- maxHistoryAge);
+ jobListCache = createJobListCache();
serialNumberIndex = new SerialNumberIndex(conf.getInt(
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
@@ -544,6 +544,18 @@ public class HistoryFileManager extends
super.serviceInit(conf);
}
+ @Override
+ public void serviceStop() throws Exception {
+ ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
+ super.serviceStop();
+ }
+
+ protected JobListCache createJobListCache() {
+ return new JobListCache(conf.getInt(
+ JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
+ JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
+ }
+
private void mkdir(FileContext fc, Path path, FsPermission fsp)
throws IOException {
if (!fc.util().exists(path)) {
@@ -656,18 +668,18 @@ public class HistoryFileManager extends
return jhStatusList;
}
- private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
+ protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
FileContext fc) throws IOException {
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
}
-
+
/**
* Finds all history directories with a timestamp component by scanning the
* filesystem. Used when the JobHistory server is started.
*
- * @return
+ * @return list of history directories
*/
- private List<FileStatus> findTimestampedDirectories() throws IOException {
+ protected List<FileStatus> findTimestampedDirectories() throws IOException {
List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
return fsList;
@@ -954,7 +966,7 @@ public class HistoryFileManager extends
}
}
if (!halted) {
- doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
+ deleteDir(serialDir);
removeDirectoryFromSerialNumberIndex(serialDir.getPath());
existingDoneSubdirs.remove(serialDir.getPath());
} else {
@@ -962,6 +974,13 @@ public class HistoryFileManager extends
}
}
}
+
+ protected boolean deleteDir(FileStatus serialDir)
+ throws AccessControlException, FileNotFoundException,
+ UnsupportedFileSystemException, IOException {
+ return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
+ }
+
@VisibleForTesting
protected void setMaxHistoryAge(long newValue){
maxHistoryAge=newValue;
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Mon Aug 12 21:25:49 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -71,7 +73,11 @@ public class JobHistory extends Abstract
private HistoryStorage storage = null;
private HistoryFileManager hsManager = null;
-
+ ScheduledFuture<?> futureHistoryCleaner = null;
+
+ //History job cleaner interval
+ private long cleanerInterval;
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
LOG.info("JobHistory Init");
@@ -84,7 +90,7 @@ public class JobHistory extends Abstract
JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
- hsManager = new HistoryFileManager();
+ hsManager = createHistoryFileManager();
hsManager.init(conf);
try {
hsManager.initExisting();
@@ -92,9 +98,8 @@ public class JobHistory extends Abstract
throw new YarnRuntimeException("Failed to intialize existing directories", e);
}
- storage = ReflectionUtils.newInstance(conf.getClass(
- JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
- HistoryStorage.class), conf);
+ storage = createHistoryStorage();
+
if (storage instanceof Service) {
((Service) storage).init(conf);
}
@@ -103,6 +108,16 @@ public class JobHistory extends Abstract
super.serviceInit(conf);
}
+ protected HistoryStorage createHistoryStorage() {
+ return ReflectionUtils.newInstance(conf.getClass(
+ JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+ HistoryStorage.class), conf);
+ }
+
+ protected HistoryFileManager createHistoryFileManager() {
+ return new HistoryFileManager();
+ }
+
@Override
protected void serviceStart() throws Exception {
hsManager.start();
@@ -118,19 +133,14 @@ public class JobHistory extends Abstract
moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
// Start historyCleaner
- boolean startCleanerService = conf.getBoolean(
- JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
- if (startCleanerService) {
- long runInterval = conf.getLong(
- JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
- JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
- scheduledExecutor
- .scheduleAtFixedRate(new HistoryCleaner(),
- 30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
- }
+ scheduleHistoryCleaner();
super.serviceStart();
}
+ protected int getInitDelaySecs() {
+ return 30;
+ }
+
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping JobHistory");
@@ -225,6 +235,25 @@ public class JobHistory extends Abstract
return storage.getAllPartialJobs();
}
+ public void refreshLoadedJobCache() {
+ if (getServiceState() == STATE.STARTED) {
+ if (storage instanceof CachedHistoryStorage) {
+ ((CachedHistoryStorage) storage).refreshLoadedJobCache();
+ } else {
+ throw new UnsupportedOperationException(storage.getClass().getName()
+ + " is expected to be an instance of "
+ + CachedHistoryStorage.class.getName());
+ }
+ } else {
+ LOG.warn("Failed to execute refreshLoadedJobCache: JobHistory service is not started");
+ }
+ }
+
+ @VisibleForTesting
+ HistoryStorage getHistoryStorage() {
+ return storage;
+ }
+
/**
* Look for a set of partial jobs.
*
@@ -256,6 +285,43 @@ public class JobHistory extends Abstract
fBegin, fEnd, jobState);
}
+ public void refreshJobRetentionSettings() {
+ if (getServiceState() == STATE.STARTED) {
+ conf = createConf();
+ long maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
+ hsManager.setMaxHistoryAge(maxHistoryAge);
+ if (futureHistoryCleaner != null) {
+ futureHistoryCleaner.cancel(false);
+ }
+ futureHistoryCleaner = null;
+ scheduleHistoryCleaner();
+ } else {
+ LOG.warn("Failed to execute refreshJobRetentionSettings : Job History service is not started");
+ }
+ }
+
+ private void scheduleHistoryCleaner() {
+ boolean startCleanerService = conf.getBoolean(
+ JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
+ if (startCleanerService) {
+ cleanerInterval = conf.getLong(
+ JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
+
+ futureHistoryCleaner = scheduledExecutor.scheduleAtFixedRate(
+ new HistoryCleaner(), getInitDelaySecs() * 1000l, cleanerInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ protected Configuration createConf() {
+ return new Configuration();
+ }
+
+ public long getCleanerInterval() {
+ return cleanerInterval;
+ }
// TODO AppContext - Not Required
private ApplicationAttemptId appAttemptID;
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Mon Aug 12 21:25:49 2013
@@ -26,11 +26,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
@@ -59,6 +61,7 @@ public class JobHistoryServer extends Co
private JobHistory jobHistoryService;
private JHSDelegationTokenSecretManager jhsDTSecretManager;
private AggregatedLogDeletionService aggLogDelService;
+ private HSAdminServer hsAdminServer;
public JobHistoryServer() {
super(JobHistoryServer.class.getName());
@@ -81,9 +84,11 @@ public class JobHistoryServer extends Co
clientService = new HistoryClientService(historyContext,
this.jhsDTSecretManager);
aggLogDelService = new AggregatedLogDeletionService();
+ hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
addService(jobHistoryService);
addService(clientService);
addService(aggLogDelService);
+ addService(hsAdminServer);
super.serviceInit(config);
}
@@ -135,11 +140,13 @@ public class JobHistoryServer extends Co
return this.clientService;
}
- public static void main(String[] args) {
- Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ static JobHistoryServer launchJobHistoryServer(String[] args) {
+ Thread.
+ setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
+ JobHistoryServer jobHistoryServer = null;
try {
- JobHistoryServer jobHistoryServer = new JobHistoryServer();
+ jobHistoryServer = new JobHistoryServer();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(jobHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
@@ -148,7 +155,12 @@ public class JobHistoryServer extends Co
jobHistoryServer.start();
} catch (Throwable t) {
LOG.fatal("Error starting JobHistoryServer", t);
- System.exit(-1);
+ ExitUtil.terminate(-1, "Error starting JobHistoryServer");
}
+ return jobHistoryServer;
+ }
+
+ public static void main(String[] args) {
+ launchJobHistoryServer(args);
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Mon Aug 12 21:25:49 2013
@@ -70,12 +70,15 @@ public class TestJobHistoryEvents {
((JobHistory)context).start();
Assert.assertTrue( context.getStartTime()>0);
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
-
-
+
+ // get job before stopping JobHistory
+ Job parsedJob = context.getJob(jobId);
+
+ // stop JobHistory
((JobHistory)context).stop();
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
- Job parsedJob = context.getJob(jobId);
-
+
+
Assert.assertEquals("CompletedMaps not correct", 2,
parsedJob.getCompletedMaps());
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Mon Aug 12 21:25:49 2013
@@ -535,7 +535,10 @@ public class TestJobHistoryParsing {
Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
fileInfo = hfm.getFileInfo(jobId);
+ hfm.stop();
Assert.assertNotNull("Unable to locate old job history", fileInfo);
+ Assert.assertTrue("HistoryFileManager not shutdown properly",
+ hfm.moveToDoneExecutor.isTerminated());
} finally {
LOG.info("FINISHED testScanningOldDirs");
}
@@ -636,6 +639,9 @@ public class TestJobHistoryParsing {
// correct live time
hfm.setMaxHistoryAge(-1);
hfm.clean();
+ hfm.stop();
+ Assert.assertTrue("Thread pool shutdown",
+ hfm.moveToDoneExecutor.isTerminated());
// should be deleted !
Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java Mon Aug 12 21:25:49 2013
@@ -63,19 +63,16 @@ public class TestJobHistoryServer {
private static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
-
-
JobHistoryServer historyServer=null;
+
// simple test init/start/stop JobHistoryServer. Status should change.
-
@Test (timeout= 50000 )
public void testStartStopServer() throws Exception {
-
historyServer = new JobHistoryServer();
Configuration config = new Configuration();
historyServer.init(config);
assertEquals(STATE.INITED, historyServer.getServiceState());
- assertEquals(3, historyServer.getServices().size());
+ assertEquals(4, historyServer.getServices().size());
HistoryClientService historyService = historyServer.getClientService();
assertNotNull(historyServer.getClientService());
assertEquals(STATE.INITED, historyService.getServiceState());
@@ -86,15 +83,9 @@ public class TestJobHistoryServer {
historyServer.stop();
assertEquals(STATE.STOPPED, historyServer.getServiceState());
assertNotNull(historyService.getClientHandler().getConnectAddress());
-
-
-
}
-
-
//Test reports of JobHistoryServer. History server should get log files from MRApp and read them
-
@Test (timeout= 50000 )
public void testReports() throws Exception {
Configuration config = new Configuration();
@@ -128,7 +119,6 @@ public class TestJobHistoryServer {
assertEquals(1, jobs.size());
assertEquals("job_0_0000",jobs.keySet().iterator().next().toString());
-
Task task = job.getTasks().values().iterator().next();
TaskAttempt attempt = task.getAttempts().values().iterator().next();
@@ -188,14 +178,14 @@ public class TestJobHistoryServer {
assertEquals("", diagnosticResponse.getDiagnostics(0));
}
- // test main method
+
+ // test launch method
@Test (timeout =60000)
- public void testMainMethod() throws Exception {
+ public void testLaunch() throws Exception {
ExitUtil.disableSystemExit();
try {
- JobHistoryServer.main(new String[0]);
-
+ historyServer = JobHistoryServer.launchJobHistoryServer(new String[0]);
} catch (ExitUtil.ExitException e) {
assertEquals(0,e.status);
ExitUtil.resetFirstExitException();
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
<parent>
<artifactId>hadoop-mapreduce-client</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<name>hadoop-mapreduce-client-jobclient</name>
<properties>
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Mon Aug 12 21:25:49 2013
@@ -90,7 +90,7 @@ public class NotRunningJob implements MR
return ApplicationReport.newInstance(unknownAppId, unknownAttemptId,
"N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
"N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
- YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+ YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
}
NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Mon Aug 12 21:25:49 2013
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -75,36 +76,18 @@ public class ResourceMgrDelegate extends
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) {
- this(conf, null);
- }
-
- /**
- * Delegate responsible for communicating with the Resource Manager's
- * {@link ApplicationClientProtocol}.
- * @param conf the configuration object.
- * @param rmAddress the address of the Resource Manager
- */
- public ResourceMgrDelegate(YarnConfiguration conf,
- InetSocketAddress rmAddress) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
- this.rmAddress = rmAddress;
- if (rmAddress == null) {
- client = YarnClient.createYarnClient();
- } else {
- client = YarnClient.createYarnClient(rmAddress);
- }
+ this.client = YarnClient.createYarnClient();
init(conf);
start();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
- if (rmAddress == null) {
- this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
- }
client.init(conf);
super.serviceInit(conf);
}
@@ -304,6 +287,12 @@ public class ResourceMgrDelegate extends
}
@Override
+ public Token<AMRMTokenIdentifier> getAMRMToken(ApplicationId appId)
+ throws YarnException, IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public List<ApplicationReport> getApplications() throws YarnException,
IOException {
return client.getApplications();
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/TestSlive.java Mon Aug 12 21:25:49 2013
@@ -63,7 +63,9 @@ public class TestSlive {
/** gets the test write location according to the coding guidelines */
private static File getWriteLoc() {
String writeLoc = System.getProperty(TEST_DATA_PROP, "build/test/data/");
- return new File(writeLoc, "slive");
+ File writeDir = new File(writeLoc, "slive");
+ writeDir.mkdirs();
+ return writeDir;
}
/** gets where the MR job places its data + output + results */
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestAuditLogger.java Mon Aug 12 21:25:49 2013
@@ -137,11 +137,12 @@ public class TestAuditLogger extends Tes
/**
* Test {@link AuditLogger} with IP set.
*/
- @SuppressWarnings("deprecation")
public void testAuditLoggerWithIP() throws Exception {
Configuration conf = new Configuration();
// start the IPC server
- Server server = RPC.getServer(new MyTestRPCServer(), "0.0.0.0", 0, conf);
+ Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+ .setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
+ .setPort(0).build();
server.start();
InetSocketAddress addr = NetUtils.getConnectAddress(server);
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Mon Aug 12 21:25:49 2013
@@ -430,7 +430,7 @@ public class TestClientServiceDelegate {
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+ "N/A", 0.0f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
}
private ApplicationReport getRunningApplicationReport(String host, int port) {
@@ -440,7 +440,7 @@ public class TestClientServiceDelegate {
return ApplicationReport.newInstance(appId, attemptId, "user", "queue",
"appname", host, port, null, YarnApplicationState.RUNNING, "diagnostics",
"url", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f,
- YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+ YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
}
private ResourceMgrDelegate getRMDelegate() throws IOException {
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java Mon Aug 12 21:25:49 2013
@@ -276,18 +276,16 @@ public class TestJobCounters {
// there are too few spills to combine (2 < 3)
// Each map spills 2^14 records, so maps spill 49152 records, combined.
- // The reduce spill count is composed of the read from one segment and
- // the intermediate merge of the other two. The intermediate merge
+ // The combiner has emitted 24576 records to the reducer; these are all
+ // fetched straight to memory from the map side. The intermediate merge
// adds 8192 records per segment read; again, there are too few spills to
- // combine, so all 16834 are written to disk (total 32768 spilled records
- // for the intermediate merge). The merge into the reduce includes only
- // the unmerged segment, size 8192. Total spilled records in the reduce
- // is 32768 from the merge + 8192 unmerged segment = 40960 records
+ // combine, so all Total spilled records in the reduce
+ // is 8192 records / map * 3 maps = 24576.
- // Total: map + reduce = 49152 + 40960 = 90112
+ // Total: map + reduce = 49152 + 24576 = 73728
// 3 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 61440 output records
- validateCounters(c1, 90112, 15360, 61440);
+ validateCounters(c1, 73728, 15360, 61440);
validateFileCounters(c1, inputSize, 0, 0, 0);
validateOldFileCounters(c1, inputSize, 61928, 0, 0);
}
@@ -316,12 +314,12 @@ public class TestJobCounters {
// 1st merge: read + write = 8192 * 4
// 2nd merge: read + write = 8192 * 4
// final merge: 0
- // Total reduce: 65536
+ // Total reduce: 32768
- // Total: map + reduce = 2^16 + 2^16 = 131072
+ // Total: map + reduce = 2^16 + 2^15 = 98304
// 4 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 81920 output records
- validateCounters(c1, 131072, 20480, 81920);
+ validateCounters(c1, 98304, 20480, 81920);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -349,7 +347,7 @@ public class TestJobCounters {
// Total reduce: 45056
// 5 files, 5120 = 5 * 1024 rec/file = 15360 input records
// 4 records/line = 102400 output records
- validateCounters(c1, 147456, 25600, 102400);
+ validateCounters(c1, 122880, 25600, 102400);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -394,7 +392,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN0"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
- validateCounters(c1, 90112, 15360, 61440);
+ validateCounters(c1, 73728, 15360, 61440);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -416,7 +414,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN1"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
- validateCounters(c1, 131072, 20480, 81920);
+ validateCounters(c1, 98304, 20480, 81920);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
@@ -439,7 +437,7 @@ public class TestJobCounters {
job, new Path(OUT_DIR, "outputN2"));
assertTrue(job.waitForCompletion(true));
final Counters c1 = Counters.downgrade(job.getCounters());
- validateCounters(c1, 147456, 25600, 102400);
+ validateCounters(c1, 122880, 25600, 102400);
validateFileCounters(c1, inputSize, 0, 0, 0);
}
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Mon Aug 12 21:25:49 2013
@@ -63,7 +63,7 @@ public class TestKeyFieldBasedComparator
conf.setOutputValueClass(LongWritable.class);
conf.setNumMapTasks(1);
- conf.setNumReduceTasks(2);
+ conf.setNumReduceTasks(1);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
@@ -101,9 +101,7 @@ public class TestKeyFieldBasedComparator
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line = reader.readLine();
//make sure we get what we expect as the first line, and also
- //that we have two lines (both the lines must end up in the same
- //reducer since the partitioner takes the same key spec for all
- //lines
+ //that we have two lines
if (expect == 1) {
assertTrue(line.startsWith(line1));
} else if (expect == 2) {
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestLocalRunner.java Mon Aug 12 21:25:49 2013
@@ -31,9 +31,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -410,6 +410,7 @@ public class TestLocalRunner extends Tes
}
/** Test case for zero mappers */
+ @Test
public void testEmptyMaps() throws Exception {
Job job = Job.getInstance();
Path outputPath = getOutputPath();
@@ -428,5 +429,145 @@ public class TestLocalRunner extends Tes
boolean success = job.waitForCompletion(true);
assertTrue("Empty job should work", success);
}
+
+ /** @return the directory where numberfiles are written (mapper inputs) */
+ private Path getNumberDirPath() {
+ return new Path(getInputPath(), "numberfiles");
+ }
+
+ /**
+ * Write out an input file containing an integer.
+ *
+ * @param fileNum the file number to write to.
+ * @param value the value to write to the file
+ * @return the path of the written file.
+ */
+ private Path makeNumberFile(int fileNum, int value) throws IOException {
+ Path workDir = getNumberDirPath();
+ Path filePath = new Path(workDir, "file" + fileNum);
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ OutputStream os = fs.create(filePath);
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+ w.write("" + value);
+ w.close();
+
+ return filePath;
+ }
+
+ /**
+ * Each record received by this mapper is a number 'n'.
+ * Emit the values [0..n-1]
+ */
+ public static class SequenceMapper
+ extends Mapper<LongWritable, Text, Text, NullWritable> {
+
+ public void map(LongWritable k, Text v, Context c)
+ throws IOException, InterruptedException {
+ int max = Integer.valueOf(v.toString());
+ for (int i = 0; i < max; i++) {
+ c.write(new Text("" + i), NullWritable.get());
+ }
+ }
+ }
+
+ private final static int NUMBER_FILE_VAL = 100;
+
+ /**
+ * Tally up the values and ensure that we got as much data
+ * out as we put in.
+ * Each mapper generated 'NUMBER_FILE_VAL' values (0..NUMBER_FILE_VAL-1).
+ * Verify that across all our reducers we got exactly this much
+ * data back.
+ */
+ private void verifyNumberJob(int numMaps) throws Exception {
+ Path outputDir = getOutputPath();
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ FileStatus [] stats = fs.listStatus(outputDir);
+ int valueSum = 0;
+ for (FileStatus f : stats) {
+ FSDataInputStream istream = fs.open(f.getPath());
+ BufferedReader r = new BufferedReader(new InputStreamReader(istream));
+ String line = null;
+ while ((line = r.readLine()) != null) {
+ valueSum += Integer.valueOf(line.trim());
+ }
+ r.close();
+ }
+
+ int maxVal = NUMBER_FILE_VAL - 1;
+ int expectedPerMapper = maxVal * (maxVal + 1) / 2;
+ int expectedSum = expectedPerMapper * numMaps;
+ LOG.info("expected sum: " + expectedSum + ", got " + valueSum);
+ assertEquals("Didn't get all our results back", expectedSum, valueSum);
+ }
+
+ /**
+ * Run a test which creates a SequenceMapper / IdentityReducer
+ * job over a set of generated number files.
+ */
+ private void doMultiReducerTest(int numMaps, int numReduces,
+ int parallelMaps, int parallelReduces) throws Exception {
+
+ Path in = getNumberDirPath();
+ Path out = getOutputPath();
+
+ // Clear data from any previous tests.
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(out)) {
+ fs.delete(out, true);
+ }
+
+ if (fs.exists(in)) {
+ fs.delete(in, true);
+ }
+
+ for (int i = 0; i < numMaps; i++) {
+ makeNumberFile(i, 100);
+ }
+
+ Job job = Job.getInstance();
+ job.setNumReduceTasks(numReduces);
+
+ job.setMapperClass(SequenceMapper.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+ FileInputFormat.addInputPath(job, in);
+ FileOutputFormat.setOutputPath(job, out);
+
+ LocalJobRunner.setLocalMaxRunningMaps(job, parallelMaps);
+ LocalJobRunner.setLocalMaxRunningReduces(job, parallelReduces);
+
+ boolean result = job.waitForCompletion(true);
+ assertTrue("Job failed!!", result);
+
+ verifyNumberJob(numMaps);
+ }
+
+ @Test
+ public void testOneMapMultiReduce() throws Exception {
+ doMultiReducerTest(1, 2, 1, 1);
+ }
+
+ @Test
+ public void testOneMapMultiParallelReduce() throws Exception {
+ doMultiReducerTest(1, 2, 1, 2);
+ }
+
+ @Test
+ public void testMultiMapOneReduce() throws Exception {
+ doMultiReducerTest(4, 1, 2, 1);
+ }
+
+ @Test
+ public void testMultiMapMultiReduce() throws Exception {
+ doMultiReducerTest(4, 4, 2, 2);
+ }
+
}
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Mon Aug 12 21:25:49 2013
@@ -20,23 +20,31 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.ArrayList;
+import java.util.Map;
import java.util.Set;
-import java.util.zip.GZIPOutputStream;
+import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPOutputStream;
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -715,6 +723,69 @@ public class TestCombineFileInputFormat
out.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
+
+ public void testNodeDistribution() throws IOException, InterruptedException {
+ DummyInputFormat inFormat = new DummyInputFormat();
+ int numBlocks = 60;
+ long totLength = 0;
+ long blockSize = 100;
+ int numNodes = 10;
+
+ long minSizeNode = 50;
+ long minSizeRack = 50;
+ int maxSplitSize = 200; // 4 blocks per split.
+
+ String[] locations = new String[numNodes];
+ for (int i = 0; i < numNodes; i++) {
+ locations[i] = "h" + i;
+ }
+ String[] racks = new String[0];
+ Path path = new Path("hdfs://file");
+
+ OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+
+ int hostCountBase = 0;
+ // Generate block list. Replication 3 per block.
+ for (int i = 0; i < numBlocks; i++) {
+ int localHostCount = hostCountBase;
+ String[] blockHosts = new String[3];
+ for (int j = 0; j < 3; j++) {
+ int hostNum = localHostCount % numNodes;
+ blockHosts[j] = "h" + hostNum;
+ localHostCount++;
+ }
+ hostCountBase++;
+ blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts,
+ racks);
+ totLength += blockSize;
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
+ HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
+ HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
+ Map<String, Set<OneBlockInfo>> nodeToBlocks = new TreeMap<String, Set<OneBlockInfo>>();
+
+ OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+ nodeToBlocks, rackToNodes);
+
+ inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+ maxSplitSize, minSizeNode, minSizeRack, splits);
+
+ int expectedSplitCount = (int) (totLength / maxSplitSize);
+ Assert.assertEquals(expectedSplitCount, splits.size());
+
+ // Ensure 90+% of the splits have node local blocks.
+ // 100% locality may not always be achieved.
+ int numLocalSplits = 0;
+ for (InputSplit inputSplit : splits) {
+ Assert.assertEquals(maxSplitSize, inputSplit.getLength());
+ if (inputSplit.getLocations().length == 1) {
+ numLocalSplits++;
+ }
+ }
+ Assert.assertTrue(numLocalSplits >= 0.9 * splits.size());
+ }
public void testNodeInputSplit() throws IOException, InterruptedException {
// Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on
@@ -744,8 +815,8 @@ public class TestCombineFileInputFormat
new HashMap<String, List<OneBlockInfo>>();
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
- HashMap<String, List<OneBlockInfo>> nodeToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
+ HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
+ new HashMap<String, Set<OneBlockInfo>>();
OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
nodeToBlocks, rackToNodes);
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Mon Aug 12 21:25:49 2013
@@ -56,7 +56,7 @@ public class TestMRKeyFieldBasedComparat
conf.set("mapreduce.partition.keypartitioner.options", "-k1.1,1.1");
conf.set(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, " ");
- Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
+ Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1,
line1 +"\n" + line2 + "\n");
job.setMapperClass(InverseMapper.class);
job.setReducerClass(Reducer.class);
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Mon Aug 12 21:25:49 2013
@@ -91,8 +91,10 @@ public class TestUmbilicalProtocolWithJo
.when(mockTT).getProtocolSignature(anyString(), anyLong(), anyInt());
JobTokenSecretManager sm = new JobTokenSecretManager();
- final Server server = RPC.getServer(TaskUmbilicalProtocol.class, mockTT,
- ADDRESS, 0, 5, true, conf, sm);
+ final Server server = new RPC.Builder(conf)
+ .setProtocol(TaskUmbilicalProtocol.class).setInstance(mockTT)
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+ .setSecretManager(sm).build();
server.start();
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Mon Aug 12 21:25:49 2013
@@ -151,11 +151,14 @@ public class MiniMRYarnCluster extends M
if (!getConfig().getBoolean(
JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+ String hostname = MiniYARNCluster.getHostname();
// pick free random ports.
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
- MiniYARNCluster.getHostname() + ":0");
+ hostname + ":0");
getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
- MiniYARNCluster.getHostname() + ":0");
+ hostname + ":0");
+ getConfig().set(JHAdminConfig.JHS_ADMIN_ADDRESS,
+ hostname + ":0");
}
historyServer = new JobHistoryServer();
historyServer.init(getConfig());
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Mon Aug 12 21:25:49 2013
@@ -422,16 +422,20 @@ public class TestMRJobs {
@Override
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
- Path[] files = context.getLocalCacheFiles();
- Path[] archives = context.getLocalCacheArchives();
+ Path[] localFiles = context.getLocalCacheFiles();
+ URI[] files = context.getCacheFiles();
+ Path[] localArchives = context.getLocalCacheArchives();
+ URI[] archives = context.getCacheArchives();
// Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files
// and 2 archives are present
+ Assert.assertEquals(4, localFiles.length);
Assert.assertEquals(4, files.length);
+ Assert.assertEquals(2, localArchives.length);
Assert.assertEquals(2, archives.length);
// Check lengths of the files
- Map<String, Path> filesMap = pathsToMap(files);
+ Map<String, Path> filesMap = pathsToMap(localFiles);
Assert.assertTrue(filesMap.containsKey("distributed.first.symlink"));
Assert.assertEquals(1, localFs.getFileStatus(
filesMap.get("distributed.first.symlink")).getLen());
@@ -440,7 +444,7 @@ public class TestMRJobs {
filesMap.get("distributed.second.jar")).getLen() > 1);
// Check extraction of the archive
- Map<String, Path> archivesMap = pathsToMap(archives);
+ Map<String, Path> archivesMap = pathsToMap(localArchives);
Assert.assertTrue(archivesMap.containsKey("distributed.third.jar"));
Assert.assertTrue(localFs.exists(new Path(
archivesMap.get("distributed.third.jar"), "distributed.jar.inside3")));
Modified: hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestNonExistentJob.java Mon Aug 12 21:25:49 2013
@@ -22,22 +22,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
+import java.io.IOException;
import java.net.InetAddress;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.security.PrivilegedExceptionAction;
public class TestNonExistentJob extends TestCase {
@@ -96,8 +89,13 @@ public class TestNonExistentJob extends
}
public void testGetInvalidJob() throws Exception {
- RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
- assertNull(runJob);
+ try {
+ RunningJob runJob = new JobClient(getJobConf()).getJob(JobID.forName("job_0_0"));
+ fail("Exception is expected to thrown ahead!");
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertTrue(e.getMessage().contains("ApplicationNotFoundException"));
+ }
}
}