You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2014/03/14 01:30:35 UTC

svn commit: r1577391 - /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java

Author: cdouglas
Date: Fri Mar 14 00:30:35 2014
New Revision: 1577391

URL: http://svn.apache.org/r1577391
Log:
YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. Contributed by Sangjin Lee


Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java?rev=1577391&r1=1577390&r2=1577391&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java Fri Mar 14 00:30:35 2014
@@ -18,19 +18,26 @@
 
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -38,7 +45,6 @@ import org.apache.hadoop.mapreduce.filec
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -89,8 +95,26 @@ public class TestLocalDistributedCacheMa
   public void cleanup() throws Exception {
     delete(localDir);
   }
-  
-  @SuppressWarnings("rawtypes")
+
+  /**
+   * Mock input stream based on a byte array so that it can be used by a
+   * FSDataInputStream.
+   */
+  private static class MockInputStream extends ByteArrayInputStream
+      implements Seekable, PositionedReadable {
+    public MockInputStream(byte[] buf) {
+      super(buf);
+    }
+
+    // empty implementation for unused methods
+    public int read(long position, byte[] buffer, int offset, int length) { return -1; }
+    public void readFully(long position, byte[] buffer, int offset, int length) {}
+    public void readFully(long position, byte[] buffer) {}
+    public void seek(long position) {}
+    public long getPos() { return 0; }
+    public boolean seekToNewSource(long targetPos) { return false; }
+  }
+
   @Test
   public void testDownload() throws Exception {
     JobConf conf = new JobConf();
@@ -123,28 +147,22 @@ public class TestLocalDistributedCacheMa
         }
       }
     });
-    
-    doAnswer(new Answer() {
+
+    when(mockfs.getConf()).thenReturn(conf);
+    final FSDataInputStream in =
+        new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
+    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
       @Override
-      public Object answer(InvocationOnMock args) throws Throwable {
-        //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
-        Path src = (Path)args.getArguments()[1];
-        Path dst = (Path)args.getArguments()[2];
-        if("file.txt".equals(src.getName())) {
-          File f = new File(dst.toUri().getPath());
-          FileWriter writer = new FileWriter(f);
-          try {
-            writer.append("This is a test file\n");
-          } finally {
-            if(writer != null) writer.close();
-          }
+      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+        Path src = (Path)args.getArguments()[0];
+        if ("file.txt".equals(src.getName())) {
+          return in;
         } else {
           throw new FileNotFoundException(src+" not supported by mocking");
         }
-        return null;
       }
-    }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-    
+    });
+
     DistributedCache.addCacheFile(file, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
@@ -159,8 +177,7 @@ public class TestLocalDistributedCacheMa
     }
     assertFalse(link.exists());
   }
-  
-  @SuppressWarnings("rawtypes")
+
   @Test
   public void testEmptyDownload() throws Exception {
     JobConf conf = new JobConf();
@@ -184,16 +201,16 @@ public class TestLocalDistributedCacheMa
         throw new FileNotFoundException(p+" not supported by mocking");
       }
     });
-    
-    doAnswer(new Answer() {
+
+    when(mockfs.getConf()).thenReturn(conf);
+    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
       @Override
-      public Object answer(InvocationOnMock args) throws Throwable {
-        //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
-        Path src = (Path)args.getArguments()[1];
+      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+        Path src = (Path)args.getArguments()[0];
         throw new FileNotFoundException(src+" not supported by mocking");
       }
-    }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-    
+    });
+
     conf.set(MRJobConfig.CACHE_FILES, "");
     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
@@ -203,9 +220,8 @@ public class TestLocalDistributedCacheMa
       manager.close();
     }
   }
-  
-  
-  @SuppressWarnings("rawtypes")
+
+
   @Test
   public void testDuplicateDownload() throws Exception {
     JobConf conf = new JobConf();
@@ -238,28 +254,22 @@ public class TestLocalDistributedCacheMa
         }
       }
     });
-    
-    doAnswer(new Answer() {
+
+    when(mockfs.getConf()).thenReturn(conf);
+    final FSDataInputStream in =
+        new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
+    when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
       @Override
-      public Object answer(InvocationOnMock args) throws Throwable {
-        //Ignored boolean overwrite = (Boolean) args.getArguments()[0];
-        Path src = (Path)args.getArguments()[1];
-        Path dst = (Path)args.getArguments()[2];
-        if("file.txt".equals(src.getName())) {
-          File f = new File(dst.toUri().getPath());
-          FileWriter writer = new FileWriter(f);
-          try {
-            writer.append("This is a test file\n");
-          } finally {
-            if(writer != null) writer.close();
-          }
+      public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
+        Path src = (Path)args.getArguments()[0];
+        if ("file.txt".equals(src.getName())) {
+          return in;
         } else {
           throw new FileNotFoundException(src+" not supported by mocking");
         }
-        return null;
       }
-    }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
-    
+    });
+
     DistributedCache.addCacheFile(file, conf);
     DistributedCache.addCacheFile(file, conf);
     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");