You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2014/03/14 01:30:35 UTC
svn commit: r1577391 -
/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
Author: cdouglas
Date: Fri Mar 14 00:30:35 2014
New Revision: 1577391
URL: http://svn.apache.org/r1577391
Log:
YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. Contributed by Sangjin Lee
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java Fri Mar 14 00:30:35 2014
@@ -18,19 +18,26 @@
package org.apache.hadoop.mapred;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -38,7 +45,6 @@ import org.apache.hadoop.mapreduce.filec
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -89,8 +95,26 @@ public class TestLocalDistributedCacheMa
public void cleanup() throws Exception {
delete(localDir);
}
-
- @SuppressWarnings("rawtypes")
+
+ /**
+ * Mock input stream based on a byte array so that it can be used by a
+ * FSDataInputStream.
+ */
+ private static class MockInputStream extends ByteArrayInputStream
+ implements Seekable, PositionedReadable {
+ public MockInputStream(byte[] buf) {
+ super(buf);
+ }
+
+ // empty implementation for unused methods
+ public int read(long position, byte[] buffer, int offset, int length) { return -1; }
+ public void readFully(long position, byte[] buffer, int offset, int length) {}
+ public void readFully(long position, byte[] buffer) {}
+ public void seek(long position) {}
+ public long getPos() { return 0; }
+ public boolean seekToNewSource(long targetPos) { return false; }
+ }
+
@Test
public void testDownload() throws Exception {
JobConf conf = new JobConf();
@@ -123,28 +147,22 @@ public class TestLocalDistributedCacheMa
}
}
});
-
- doAnswer(new Answer() {
+
+ when(mockfs.getConf()).thenReturn(conf);
+ final FSDataInputStream in =
+ new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
+ when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
@Override
- public Object answer(InvocationOnMock args) throws Throwable {
- //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
- Path src = (Path)args.getArguments()[1];
- Path dst = (Path)args.getArguments()[2];
- if("file.txt".equals(src.getName())) {
- File f = new File(dst.toUri().getPath());
- FileWriter writer = new FileWriter(f);
- try {
- writer.append("This is a test file\n");
- } finally {
- if(writer != null) writer.close();
- }
+ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+ Path src = (Path)args.getArguments()[0];
+ if ("file.txt".equals(src.getName())) {
+ return in;
} else {
throw new FileNotFoundException(src+" not supported by mocking");
}
- return null;
}
- }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-
+ });
+
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
@@ -159,8 +177,7 @@ public class TestLocalDistributedCacheMa
}
assertFalse(link.exists());
}
-
- @SuppressWarnings("rawtypes")
+
@Test
public void testEmptyDownload() throws Exception {
JobConf conf = new JobConf();
@@ -184,16 +201,16 @@ public class TestLocalDistributedCacheMa
throw new FileNotFoundException(p+" not supported by mocking");
}
});
-
- doAnswer(new Answer() {
+
+ when(mockfs.getConf()).thenReturn(conf);
+ when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
@Override
- public Object answer(InvocationOnMock args) throws Throwable {
- //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
- Path src = (Path)args.getArguments()[1];
+ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+ Path src = (Path)args.getArguments()[0];
throw new FileNotFoundException(src+" not supported by mocking");
}
- }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-
+ });
+
conf.set(MRJobConfig.CACHE_FILES, "");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
@@ -203,9 +220,8 @@ public class TestLocalDistributedCacheMa
manager.close();
}
}
-
-
- @SuppressWarnings("rawtypes")
+
+
@Test
public void testDuplicateDownload() throws Exception {
JobConf conf = new JobConf();
@@ -238,28 +254,22 @@ public class TestLocalDistributedCacheMa
}
}
});
-
- doAnswer(new Answer() {
+
+ when(mockfs.getConf()).thenReturn(conf);
+ final FSDataInputStream in =
+ new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
+ when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
@Override
- public Object answer(InvocationOnMock args) throws Throwable {
- //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
- Path src = (Path)args.getArguments()[1];
- Path dst = (Path)args.getArguments()[2];
- if("file.txt".equals(src.getName())) {
- File f = new File(dst.toUri().getPath());
- FileWriter writer = new FileWriter(f);
- try {
- writer.append("This is a test file\n");
- } finally {
- if(writer != null) writer.close();
- }
+ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+ Path src = (Path)args.getArguments()[0];
+ if ("file.txt".equals(src.getName())) {
+ return in;
} else {
throw new FileNotFoundException(src+" not supported by mocking");
}
- return null;
}
- }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-
+ });
+
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");