You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ka...@apache.org on 2014/07/16 23:30:19 UTC
svn commit: r1611197 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/ap...
Author: kasha
Date: Wed Jul 16 21:30:19 2014
New Revision: 1611197
URL: http://svn.apache.org/r1611197
Log:
MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1611197&r1=1611196&r2=1611197&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Jul 16 21:30:19 2014
@@ -162,6 +162,9 @@ Release 2.5.0 - UNRELEASED
resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
vinodkv)
+ MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly
+ assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
+
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1611197&r1=1611196&r2=1611197&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Wed Jul 16 21:30:19 2014
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
@@ -438,43 +439,6 @@ public class LocalContainerLauncher exte
}
/**
- * Within the _local_ filesystem (not HDFS), all activity takes place within
- * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
- * and all sub-MapTasks create the same filename ("file.out"). Rename that
- * to something unique (e.g., "map_0.out") to avoid collisions.
- *
- * Longer-term, we'll modify [something] to use TaskAttemptID-based
- * filenames instead of "file.out". (All of this is entirely internal,
- * so there are no particular compatibility issues.)
- */
- private MapOutputFile renameMapOutputForReduce(JobConf conf,
- TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- // move map output to reduce input
- Path mapOut = subMapOutputFile.getOutputFile();
- FileStatus mStatus = localFs.getFileStatus(mapOut);
- Path reduceIn = subMapOutputFile.getInputFileForWrite(
- TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
- Path mapOutIndex = new Path(mapOut.toString() + ".index");
- Path reduceInIndex = new Path(reduceIn.toString() + ".index");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming map output file for task attempt "
- + mapId.toString() + " from original location " + mapOut.toString()
- + " to destination " + reduceIn.toString());
- }
- if (!localFs.mkdirs(reduceIn.getParent())) {
- throw new IOException("Mkdirs failed to create "
- + reduceIn.getParent().toString());
- }
- if (!localFs.rename(mapOut, reduceIn))
- throw new IOException("Couldn't rename " + mapOut);
- if (!localFs.rename(mapOutIndex, reduceInIndex))
- throw new IOException("Couldn't rename " + mapOutIndex);
-
- return new RenamedMapOutputFile(reduceIn);
- }
-
- /**
* Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against
* the saved original state and nuke everything that doesn't belong, with
@@ -506,7 +470,46 @@ public class LocalContainerLauncher exte
}
} // end EventHandler
-
+
+ /**
+ * Within the _local_ filesystem (not HDFS), all activity takes place within
+ * a subdir inside one of the LOCAL_DIRS
+ * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+ * and all sub-MapTasks create the same filename ("file.out"). Rename that
+ * to something unique (e.g., "map_0.out") to avoid possible collisions.
+ *
+ * Longer-term, we'll modify [something] to use TaskAttemptID-based
+ * filenames instead of "file.out". (All of this is entirely internal,
+ * so there are no particular compatibility issues.)
+ */
+ @VisibleForTesting
+ protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
+ TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ // move map output to reduce input
+ Path mapOut = subMapOutputFile.getOutputFile();
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+ Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
+ Path reduceInIndex = new Path(reduceIn.toString() + ".index");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming map output file for task attempt "
+ + mapId.toString() + " from original location " + mapOut.toString()
+ + " to destination " + reduceIn.toString());
+ }
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ if (!localFs.rename(mapOutIndex, reduceInIndex))
+ throw new IOException("Couldn't rename " + mapOutIndex);
+
+ return new RenamedMapOutputFile(reduceIn);
+ }
+
private static class RenamedMapOutputFile extends MapOutputFile {
private Path path;
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1611197&r1=1611196&r2=1611197&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Wed Jul 16 21:30:19 2014
@@ -18,17 +18,26 @@
package org.apache.hadoop.mapred;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer;
public class TestLocalContainerLauncher {
private static final Log LOG =
LogFactory.getLog(TestLocalContainerLauncher.class);
+ private static File testWorkDir;
+ private static final String[] localDirs = new String[2];
+
+ private static void delete(File dir) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
+ fs.delete(p, true);
+ }
+
+ @BeforeClass
+ public static void setupTestDirs() throws IOException {
+ testWorkDir = new File("target",
+ TestLocalContainerLauncher.class.getCanonicalName());
+ testWorkDir.delete();
+ testWorkDir.mkdirs();
+ testWorkDir = testWorkDir.getAbsoluteFile();
+ for (int i = 0; i < localDirs.length; i++) {
+ final File dir = new File(testWorkDir, "local-" + i);
+ dir.mkdirs();
+ localDirs[i] = dir.toString();
+ }
+ }
+
+ @AfterClass
+ public static void cleanupTestDirs() throws IOException {
+ if (testWorkDir != null) {
+ delete(testWorkDir);
+ }
+ }
@SuppressWarnings("rawtypes")
@Test(timeout=10000)
@@ -141,4 +183,35 @@ public class TestLocalContainerLauncher
when(container.getNodeId()).thenReturn(nodeId);
return container;
}
+
+
+ @Test
+ public void testRenameMapOutputForReduce() throws Exception {
+ final JobConf conf = new JobConf();
+
+ final MROutputFiles mrOutputFiles = new MROutputFiles();
+ mrOutputFiles.setConf(conf);
+
+ // make sure both dirs are distinct
+ //
+ conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
+ final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
+ conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
+ final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
+ Assert.assertNotEquals("Paths must be different!",
+ mapOut.getParent(), mapOutIdx.getParent());
+
+ // make both dirs part of LOCAL_DIR
+ conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
+
+ final FileContext lfc = FileContext.getLocalFSFileContext(conf);
+ lfc.create(mapOut, EnumSet.of(CREATE)).close();
+ lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
+
+ final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
+ final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
+
+ LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
+ }
}