You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2013/10/16 23:07:35 UTC

svn commit: r1532910 [3/3] - in /hadoop/common/branches/HDFS-2832/hadoop-common-project: hadoop-common/ hadoop-common/src/main/bin/ hadoop-common/src/main/conf/ hadoop-common/src/main/docs/ hadoop-common/src/main/java/ hadoop-common/src/main/java/org/a...

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java Wed Oct 16 21:07:28 2013
@@ -17,29 +17,592 @@
  */
 package org.apache.hadoop.io;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
 
-import junit.framework.TestCase;
+import static org.mockito.Mockito.*;
 
-public class TestMapFile extends TestCase {
+public class TestMapFile {
+  
+  private static final Path TEST_DIR = new Path(
+      System.getProperty("test.build.data", "/tmp"),
+      TestMapFile.class.getSimpleName());
+  
   private static Configuration conf = new Configuration();
 
+  @Before
+  public void setup() throws Exception {
+    LocalFileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
+      Assert.fail("Can't clean up test root dir");
+    }
+    fs.mkdirs(TEST_DIR);
+  }
+  
+  private static final Progressable defaultProgressable = new Progressable() {
+    @Override
+    public void progress() {
+    }
+  };
+
+  private static final CompressionCodec defaultCodec = new CompressionCodec() {
+    @Override
+    public CompressionOutputStream createOutputStream(OutputStream out)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public CompressionOutputStream createOutputStream(OutputStream out,
+        Compressor compressor) throws IOException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Compressor> getCompressorType() {
+      return null;
+    }
+
+    @Override
+    public Compressor createCompressor() {
+      return null;
+    }
+
+    @Override
+    public CompressionInputStream createInputStream(InputStream in)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public CompressionInputStream createInputStream(InputStream in,
+        Decompressor decompressor) throws IOException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Decompressor> getDecompressorType() {
+      return null;
+    }
+
+    @Override
+    public Decompressor createDecompressor() {
+      return null;
+    }
+
+    @Override
+    public String getDefaultExtension() {
+      return null;
+    }
+  };
+
+  private MapFile.Writer createWriter(String fileName,
+      Class<? extends WritableComparable<?>> keyClass,
+      Class<? extends Writable> valueClass) throws IOException {
+    Path dirName = new Path(TEST_DIR, fileName);
+    MapFile.Writer.setIndexInterval(conf, 4);
+    return new MapFile.Writer(conf, dirName, MapFile.Writer.keyClass(keyClass),
+        MapFile.Writer.valueClass(valueClass));
+  }
+
+  private MapFile.Reader createReader(String fileName,
+      Class<? extends WritableComparable<?>> keyClass) throws IOException {
+    Path dirName = new Path(TEST_DIR, fileName);
+    return new MapFile.Reader(dirName, conf,
+        MapFile.Reader.comparator(new WritableComparator(keyClass)));
+  }
+  
+  /**
+   * test {@code MapFile.Reader.getClosest()} method 
+   *
+   */
+  @Test
+  public void testGetClosestOnCurrentApi() throws Exception {
+    final String TEST_PREFIX = "testGetClosestOnCurrentApi.mapfile";
+    MapFile.Writer writer = createWriter(TEST_PREFIX, Text.class, Text.class);
+    int FIRST_KEY = 1;
+    // Test keys: 11,21,31,...,91
+    for (int i = FIRST_KEY; i < 100; i += 10) {      
+      Text t = new Text(Integer.toString(i));
+      writer.append(t, t);
+    }
+    writer.close();
+
+    MapFile.Reader reader = createReader(TEST_PREFIX, Text.class);
+    Text key = new Text("55");
+    Text value = new Text();
+
+    // Test get closest with step forward
+    Text closest = (Text) reader.getClosest(key, value);
+    assertEquals(new Text("61"), closest);
+
+    // Test get closest with step back
+    closest = (Text) reader.getClosest(key, value, true);
+    assertEquals(new Text("51"), closest);
+
+    // Test get closest when we pass explicit key
+    final Text explicitKey = new Text("21");
+    closest = (Text) reader.getClosest(explicitKey, value);
+    assertEquals(new Text("21"), explicitKey);
+
+    // Test what happens at boundaries. Assert if searching a key that is
+    // less than first key in the mapfile, that the first key is returned.
+    key = new Text("00");
+    closest = (Text) reader.getClosest(key, value);
+    assertEquals(FIRST_KEY, Integer.parseInt(closest.toString()));
+
+    // Assert that null is returned if key is > last entry in mapfile.
+    key = new Text("92");
+    closest = (Text) reader.getClosest(key, value);
+    assertNull("Not null key in testGetClosestWithNewCode", closest);
+
+    // If we were looking for the key before, we should get the last key
+    closest = (Text) reader.getClosest(key, value, true);
+    assertEquals(new Text("91"), closest);
+  }
+  
+  /**
+   * test {@code MapFile.Reader.midKey() } method 
+   */
+  @Test
+  public void testMidKeyOnCurrentApi() throws Exception {
+    // Write a mapfile of simple data: keys are
+    final String TEST_PREFIX = "testMidKeyOnCurrentApi.mapfile";
+    MapFile.Writer writer = createWriter(TEST_PREFIX, IntWritable.class,
+        IntWritable.class);
+    // 0,1,....9
+    int SIZE = 10;
+    for (int i = 0; i < SIZE; i++)
+      writer.append(new IntWritable(i), new IntWritable(i));
+    writer.close();
+
+    MapFile.Reader reader = createReader(TEST_PREFIX, IntWritable.class);
+    assertEquals(new IntWritable((SIZE - 1) / 2), reader.midKey());
+  }
+  
+  /**
+   * test  {@code MapFile.Writer.rename()} method 
+   */
+  @Test
+  public void testRename() {
+    final String NEW_FILE_NAME = "test-new.mapfile";
+    final String OLD_FILE_NAME = "test-old.mapfile";
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      MapFile.Writer writer = createWriter(OLD_FILE_NAME, IntWritable.class,
+          IntWritable.class);
+      writer.close();
+      MapFile.rename(fs, new Path(TEST_DIR, OLD_FILE_NAME).toString(), 
+          new Path(TEST_DIR, NEW_FILE_NAME).toString());
+      MapFile.delete(fs, new Path(TEST_DIR, NEW_FILE_NAME).toString());
+    } catch (IOException ex) {
+      fail("testRename error " + ex);
+    }
+  }
+  
+  /**
+   * test {@code MapFile.rename()} 
+   *  method with throwing {@code IOException}  
+   */
+  @Test
+  public void testRenameWithException() {
+    final String ERROR_MESSAGE = "Can't rename file";
+    final String NEW_FILE_NAME = "test-new.mapfile";
+    final String OLD_FILE_NAME = "test-old.mapfile";
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      FileSystem spyFs = spy(fs);
+
+      MapFile.Writer writer = createWriter(OLD_FILE_NAME, IntWritable.class,
+          IntWritable.class);
+      writer.close();
+
+      Path oldDir = new Path(TEST_DIR, OLD_FILE_NAME);
+      Path newDir = new Path(TEST_DIR, NEW_FILE_NAME);
+      when(spyFs.rename(oldDir, newDir)).thenThrow(
+          new IOException(ERROR_MESSAGE));
+
+      MapFile.rename(spyFs, oldDir.toString(), newDir.toString());
+      fail("testRenameWithException no exception error !!!");
+    } catch (IOException ex) {
+      assertEquals("testRenameWithException invalid IOExceptionMessage !!!",
+          ex.getMessage(), ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testRenameWithFalse() {
+    final String ERROR_MESSAGE = "Could not rename";
+    final String NEW_FILE_NAME = "test-new.mapfile";
+    final String OLD_FILE_NAME = "test-old.mapfile";
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      FileSystem spyFs = spy(fs);
+
+      MapFile.Writer writer = createWriter(OLD_FILE_NAME, IntWritable.class,
+          IntWritable.class);
+      writer.close();
+
+      Path oldDir = new Path(TEST_DIR, OLD_FILE_NAME);
+      Path newDir = new Path(TEST_DIR, NEW_FILE_NAME);
+      when(spyFs.rename(oldDir, newDir)).thenReturn(false);
+
+      MapFile.rename(spyFs, oldDir.toString(), newDir.toString());
+      fail("testRenameWithException no exception error !!!");
+    } catch (IOException ex) {
+      assertTrue("testRenameWithFalse invalid IOExceptionMessage error !!!", ex
+          .getMessage().startsWith(ERROR_MESSAGE));
+    }
+  }
+  
+  /**
+   * test throwing {@code IOException} in {@code MapFile.Writer} constructor    
+   */
+  @Test
+  public void testWriteWithFailDirCreation() {
+    String ERROR_MESSAGE = "Mkdirs failed to create directory";
+    Path dirName = new Path(TEST_DIR, "fail.mapfile");
+    MapFile.Writer writer = null;
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      FileSystem spyFs = spy(fs);
+      Path pathSpy = spy(dirName);
+      when(pathSpy.getFileSystem(conf)).thenReturn(spyFs);
+      when(spyFs.mkdirs(dirName)).thenReturn(false);
+
+      writer = new MapFile.Writer(conf, pathSpy,
+          MapFile.Writer.keyClass(IntWritable.class),
+          MapFile.Writer.valueClass(Text.class));
+      fail("testWriteWithFailDirCreation error !!!");
+    } catch (IOException ex) {
+      assertTrue("testWriteWithFailDirCreation ex error !!!", ex.getMessage()
+          .startsWith(ERROR_MESSAGE));
+    } finally {
+      if (writer != null)
+        try {
+          writer.close();
+        } catch (IOException e) {
+        }
+    }
+  }
+
+  /**
+   * test {@code MapFile.Reader.finalKey()} method
+   */
+  @Test
+  public void testOnFinalKey() {
+    final String TEST_METHOD_KEY = "testOnFinalKey.mapfile";
+    int SIZE = 10;
+    try {
+      MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+          IntWritable.class);
+      for (int i = 0; i < SIZE; i++)
+        writer.append(new IntWritable(i), new IntWritable(i));
+      writer.close();
+
+      MapFile.Reader reader = createReader(TEST_METHOD_KEY, IntWritable.class);
+      IntWritable expectedKey = new IntWritable(0);
+      reader.finalKey(expectedKey);
+      assertEquals("testOnFinalKey not same !!!", expectedKey, new IntWritable(
+          9));
+    } catch (IOException ex) {
+      fail("testOnFinalKey error !!!");
+    }
+  }
+  
+  /**
+   * test {@code MapFile.Writer} constructor with key, value
+   * and validate it with {@code keyClass(), valueClass()} methods 
+   */
+  @Test
+  public void testKeyValueClasses() {
+    Class<? extends WritableComparable<?>> keyClass = IntWritable.class;
+    Class<?> valueClass = Text.class;
+    try {
+      createWriter("testKeyValueClasses.mapfile", IntWritable.class, Text.class);
+      assertNotNull("writer key class null error !!!",
+          MapFile.Writer.keyClass(keyClass));
+      assertNotNull("writer value class null error !!!",
+          MapFile.Writer.valueClass(valueClass));
+    } catch (IOException ex) {
+      fail(ex.getMessage());
+    }
+  }
+  
+  /**
+   * test {@code MapFile.Reader.getClosest() } with wrong class key
+   */
+  @Test
+  public void testReaderGetClosest() throws Exception {
+    final String TEST_METHOD_KEY = "testReaderWithWrongKeyClass.mapfile";
+    try {
+      MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+          Text.class);
+
+      for (int i = 0; i < 10; i++)
+        writer.append(new IntWritable(i), new Text("value" + i));
+      writer.close();
+
+      MapFile.Reader reader = createReader(TEST_METHOD_KEY, Text.class);
+      reader.getClosest(new Text("2"), new Text(""));
+      fail("no excepted exception in testReaderWithWrongKeyClass !!!");
+    } catch (IOException ex) {
+      /* Should be thrown to pass the test */
+    }
+  }
+  
+  /**
+   * test {@code MapFile.Writer.append() } with wrong key class
+   */
+  @Test
+  public void testReaderWithWrongValueClass() {
+    final String TEST_METHOD_KEY = "testReaderWithWrongValueClass.mapfile";
+    try {
+      MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+          Text.class);
+      writer.append(new IntWritable(0), new IntWritable(0));
+      fail("no excepted exception in testReaderWithWrongKeyClass !!!");
+    } catch (IOException ex) {
+      /* Should be thrown to pass the test */
+    }
+  }
+  
+  /**
+   * test {@code MapFile.Reader.next(key, value)} for iteration.
+   */
+  @Test
+  public void testReaderKeyIteration() {
+    final String TEST_METHOD_KEY = "testReaderKeyIteration.mapfile";
+    int SIZE = 10;
+    int ITERATIONS = 5;
+    try {
+      MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+          Text.class);
+      int start = 0;
+      for (int i = 0; i < SIZE; i++)
+        writer.append(new IntWritable(i), new Text("Value:" + i));
+      writer.close();
+
+      MapFile.Reader reader = createReader(TEST_METHOD_KEY, IntWritable.class);
+      // test iteration
+      Writable startValue = new Text("Value:" + start);
+      int i = 0;
+      while (i++ < ITERATIONS) {
+        IntWritable key = new IntWritable(start);
+        Writable value = startValue;
+        while (reader.next(key, value)) {
+          assertNotNull(key);
+          assertNotNull(value);
+        }
+        reader.reset();
+      }
+      assertTrue("reader seek error !!!",
+          reader.seek(new IntWritable(SIZE / 2)));
+      assertFalse("reader seek error !!!",
+          reader.seek(new IntWritable(SIZE * 2)));
+    } catch (IOException ex) {
+      fail("reader seek error !!!");
+    }
+  }
+
+  /**
+   * test {@code MapFile.Writer.testFix} method
+   */
+  @Test
+  public void testFix() {
+    final String INDEX_LESS_MAP_FILE = "testFix.mapfile";
+    int PAIR_SIZE = 20;
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      Path dir = new Path(TEST_DIR, INDEX_LESS_MAP_FILE);
+      MapFile.Writer writer = createWriter(INDEX_LESS_MAP_FILE,
+          IntWritable.class, Text.class);
+      for (int i = 0; i < PAIR_SIZE; i++)
+        writer.append(new IntWritable(0), new Text("value"));
+      writer.close();
+
+      File indexFile = new File(".", "." + INDEX_LESS_MAP_FILE + "/index");
+      boolean isDeleted = false;
+      if (indexFile.exists())
+        isDeleted = indexFile.delete();
+
+      if (isDeleted)
+        assertTrue("testFix error !!!",
+            MapFile.fix(fs, dir, IntWritable.class, Text.class, true, conf) == PAIR_SIZE);
+    } catch (Exception ex) {
+      fail("testFix error !!!");
+    }
+  }
+  /**
+   * test all available constructor for {@code MapFile.Writer}
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testDeprecatedConstructors() {
+    String path = new Path(TEST_DIR, "writes.mapfile").toString();
+    try {
+      FileSystem fs = FileSystem.getLocal(conf);
+      MapFile.Writer writer = new MapFile.Writer(conf, fs, path,
+          IntWritable.class, Text.class, CompressionType.RECORD);
+      assertNotNull(writer);
+      writer = new MapFile.Writer(conf, fs, path, IntWritable.class,
+          Text.class, CompressionType.RECORD, defaultProgressable);
+      assertNotNull(writer);
+      writer = new MapFile.Writer(conf, fs, path, IntWritable.class,
+          Text.class, CompressionType.RECORD, defaultCodec, defaultProgressable);
+      assertNotNull(writer);
+      writer = new MapFile.Writer(conf, fs, path,
+          WritableComparator.get(Text.class), Text.class);
+      assertNotNull(writer);
+      writer = new MapFile.Writer(conf, fs, path,
+          WritableComparator.get(Text.class), Text.class,
+          SequenceFile.CompressionType.RECORD);
+      assertNotNull(writer);
+      writer = new MapFile.Writer(conf, fs, path,
+          WritableComparator.get(Text.class), Text.class,
+          CompressionType.RECORD, defaultProgressable);
+      assertNotNull(writer);
+      writer.close();
+
+      MapFile.Reader reader = new MapFile.Reader(fs, path,
+          WritableComparator.get(IntWritable.class), conf);
+      assertNotNull(reader);
+      assertNotNull("reader key is null !!!", reader.getKeyClass());
+      assertNotNull("reader value in null", reader.getValueClass());
+
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+  }
+  
+  /**
+   * test {@code MapFile.Writer} constructor 
+   * with IllegalArgumentException  
+   *  
+   */
+  @Test
+  public void testKeyLessWriterCreation() {
+    MapFile.Writer writer = null;
+    try {
+      writer = new MapFile.Writer(conf, TEST_DIR);
+      fail("fail in testKeyLessWriterCreation !!!");
+    } catch (IllegalArgumentException ex) {
+    } catch (Exception e) {
+      fail("fail in testKeyLessWriterCreation. Other ex !!!");
+    } finally {
+      if (writer != null)
+        try {
+          writer.close();
+        } catch (IOException e) {
+        }
+    }
+  }
+  /**
+   * test {@code MapFile.Writer} constructor with IOException
+   */
+  @Test
+  public void testPathExplosionWriterCreation() {
+    Path path = new Path(TEST_DIR, "testPathExplosionWriterCreation.mapfile");
+    String TEST_ERROR_MESSAGE = "Mkdirs failed to create directory "
+        + path.getName();
+    MapFile.Writer writer = null;
+    try {
+      FileSystem fsSpy = spy(FileSystem.get(conf));
+      Path pathSpy = spy(path);
+      when(fsSpy.mkdirs(path)).thenThrow(new IOException(TEST_ERROR_MESSAGE));
+
+      when(pathSpy.getFileSystem(conf)).thenReturn(fsSpy);
+
+      writer = new MapFile.Writer(conf, pathSpy,
+          MapFile.Writer.keyClass(IntWritable.class),
+          MapFile.Writer.valueClass(IntWritable.class));
+      fail("fail in testPathExplosionWriterCreation !!!");
+    } catch (IOException ex) {
+      assertEquals("testPathExplosionWriterCreation ex message error !!!",
+          ex.getMessage(), TEST_ERROR_MESSAGE);
+    } catch (Exception e) {
+      fail("fail in testPathExplosionWriterCreation. Other ex !!!");
+    } finally {
+      if (writer != null)
+        try {
+          writer.close();
+        } catch (IOException e) {
+        }
+    }
+  }
+
+  /**
+   * test {@code MapFile.Writer.append} method with desc order  
+   */
+  @Test
+  public void testDescOrderWithThrowExceptionWriterAppend() {
+    try {
+      MapFile.Writer writer = createWriter(".mapfile", IntWritable.class,
+          Text.class);
+      writer.append(new IntWritable(2), new Text("value: " + 1));
+      writer.append(new IntWritable(2), new Text("value: " + 2));
+      writer.append(new IntWritable(2), new Text("value: " + 4));
+      writer.append(new IntWritable(1), new Text("value: " + 3));
+      fail("testDescOrderWithThrowExceptionWriterAppend not expected exception error !!!");
+    } catch (IOException ex) {
+    } catch (Exception e) {
+      fail("testDescOrderWithThrowExceptionWriterAppend other ex throw !!!");
+    }
+  }
+
+  @Test
+  public void testMainMethodMapFile() {
+    String path = new Path(TEST_DIR, "mainMethodMapFile.mapfile").toString();
+    String inFile = "mainMethodMapFile.mapfile";
+    String outFile = "mainMethodMapFile.mapfile";
+    String[] args = { path, outFile };
+    try {
+      MapFile.Writer writer = createWriter(inFile, IntWritable.class,
+          Text.class);
+      writer.append(new IntWritable(1), new Text("test_text1"));
+      writer.append(new IntWritable(2), new Text("test_text2"));
+      writer.close();
+      MapFile.main(args);
+    } catch (Exception ex) {
+      fail("testMainMethodMapFile error !!!");
+    }
+  }
+
   /**
    * Test getClosest feature.
+   * 
    * @throws Exception
    */
+  @Test
+  @SuppressWarnings("deprecation")
   public void testGetClosest() throws Exception {
-    // Write a mapfile of simple data: keys are 
-    Path dirName = new Path(System.getProperty("test.build.data",".") +
-      getName() + ".mapfile"); 
+    // Write a mapfile of simple data: keys are
+    Path dirName = new Path(TEST_DIR, "testGetClosest.mapfile");
     FileSystem fs = FileSystem.getLocal(conf);
     Path qualifiedDirName = fs.makeQualified(dirName);
     // Make an index entry for every third insertion.
     MapFile.Writer.setIndexInterval(conf, 3);
     MapFile.Writer writer = new MapFile.Writer(conf, fs,
-      qualifiedDirName.toString(), Text.class, Text.class);
+        qualifiedDirName.toString(), Text.class, Text.class);
     // Assert that the index interval is 1
     assertEquals(3, writer.getIndexInterval());
     // Add entries up to 100 in intervals of ten.
@@ -51,74 +614,84 @@ public class TestMapFile extends TestCas
     }
     writer.close();
     // Now do getClosest on created mapfile.
-    MapFile.Reader reader = new MapFile.Reader(fs, qualifiedDirName.toString(),
-      conf);
+    MapFile.Reader reader = new MapFile.Reader(qualifiedDirName, conf);
+    try {
     Text key = new Text("55");
     Text value = new Text();
-    Text closest = (Text)reader.getClosest(key, value);
+    Text closest = (Text) reader.getClosest(key, value);
     // Assert that closest after 55 is 60
     assertEquals(new Text("60"), closest);
     // Get closest that falls before the passed key: 50
-    closest = (Text)reader.getClosest(key, value, true);
+    closest = (Text) reader.getClosest(key, value, true);
     assertEquals(new Text("50"), closest);
     // Test get closest when we pass explicit key
     final Text TWENTY = new Text("20");
-    closest = (Text)reader.getClosest(TWENTY, value);
+    closest = (Text) reader.getClosest(TWENTY, value);
     assertEquals(TWENTY, closest);
-    closest = (Text)reader.getClosest(TWENTY, value, true);
+    closest = (Text) reader.getClosest(TWENTY, value, true);
     assertEquals(TWENTY, closest);
-    // Test what happens at boundaries.  Assert if searching a key that is
+    // Test what happens at boundaries. Assert if searching a key that is
     // less than first key in the mapfile, that the first key is returned.
     key = new Text("00");
-    closest = (Text)reader.getClosest(key, value);
+    closest = (Text) reader.getClosest(key, value);
     assertEquals(FIRST_KEY, Integer.parseInt(closest.toString()));
-    
-    // If we're looking for the first key before, and we pass in a key before 
+
+    // If we're looking for the first key before, and we pass in a key before
     // the first key in the file, we should get null
-    closest = (Text)reader.getClosest(key, value, true);
+    closest = (Text) reader.getClosest(key, value, true);
     assertNull(closest);
-    
+
     // Assert that null is returned if key is > last entry in mapfile.
     key = new Text("99");
-    closest = (Text)reader.getClosest(key, value);
+    closest = (Text) reader.getClosest(key, value);
     assertNull(closest);
 
     // If we were looking for the key before, we should get the last key
-    closest = (Text)reader.getClosest(key, value, true);
+    closest = (Text) reader.getClosest(key, value, true);
     assertEquals(new Text("90"), closest);
+    } finally {
+      reader.close();
+    }
   }
 
+  @Test
+  @SuppressWarnings("deprecation")
   public void testMidKey() throws Exception {
-    // Write a mapfile of simple data: keys are 
-    Path dirName = new Path(System.getProperty("test.build.data",".") +
-      getName() + ".mapfile"); 
+    // Write a mapfile of simple data: keys are
+    Path dirName = new Path(TEST_DIR, "testMidKey.mapfile");
     FileSystem fs = FileSystem.getLocal(conf);
     Path qualifiedDirName = fs.makeQualified(dirName);
- 
+
     MapFile.Writer writer = new MapFile.Writer(conf, fs,
-      qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
+        qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
     writer.append(new IntWritable(1), new IntWritable(1));
     writer.close();
     // Now do getClosest on created mapfile.
-    MapFile.Reader reader = new MapFile.Reader(fs, qualifiedDirName.toString(),
-      conf);
-    assertEquals(new IntWritable(1), reader.midKey());
+    MapFile.Reader reader = new MapFile.Reader(qualifiedDirName, conf);
+    try {
+      assertEquals(new IntWritable(1), reader.midKey());
+    } finally {
+      reader.close();
+    }
   }
 
-
+  @Test
+  @SuppressWarnings("deprecation")
   public void testMidKeyEmpty() throws Exception {
-    // Write a mapfile of simple data: keys are 
-    Path dirName = new Path(System.getProperty("test.build.data",".") +
-      getName() + ".mapfile"); 
+    // Write a mapfile of simple data: keys are
+    Path dirName = new Path(TEST_DIR, "testMidKeyEmpty.mapfile");
     FileSystem fs = FileSystem.getLocal(conf);
     Path qualifiedDirName = fs.makeQualified(dirName);
- 
+
     MapFile.Writer writer = new MapFile.Writer(conf, fs,
-      qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
+        qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
     writer.close();
     // Now do getClosest on created mapfile.
-    MapFile.Reader reader = new MapFile.Reader(fs, qualifiedDirName.toString(),
-      conf);
-    assertEquals(null, reader.midKey());
+    MapFile.Reader reader = new MapFile.Reader(qualifiedDirName, conf);
+    try {
+      assertEquals(null, reader.midKey()); 
+    } finally {
+      reader.close();
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java Wed Oct 16 21:07:28 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.io;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.*;
@@ -51,6 +53,39 @@ public class TestSetFile extends TestCas
       fs.close();
     }
   }
+  
+  /**
+   * test {@code SetFile.Reader} methods 
+   * next(), get() in combination 
+   */
+  public void testSetFileAccessMethods() {    
+    try {             
+      FileSystem fs = FileSystem.getLocal(conf);
+      int size = 10;
+      writeData(fs, size);
+      SetFile.Reader reader = createReader(fs);
+      assertTrue("testSetFileWithConstruction1 error !!!", reader.next(new IntWritable(0)));
+      // don't know why reader.get(i) return i+1
+      assertEquals("testSetFileWithConstruction2 error !!!", new IntWritable(size/2 + 1), reader.get(new IntWritable(size/2)));      
+      assertNull("testSetFileWithConstruction3 error !!!", reader.get(new IntWritable(size*2)));
+    } catch (Exception ex) {
+      fail("testSetFileWithConstruction error !!!");    
+    }
+  }
+
+  private SetFile.Reader createReader(FileSystem fs) throws IOException  {
+    return new SetFile.Reader(fs, FILE, 
+        WritableComparator.get(IntWritable.class), conf);    
+  }
+  
+  @SuppressWarnings("deprecation")
+  private void writeData(FileSystem fs, int elementSize) throws IOException {
+    MapFile.delete(fs, FILE);    
+    SetFile.Writer writer = new SetFile.Writer(fs, FILE, IntWritable.class);
+    for (int i = 0; i < elementSize; i++)
+      writer.append(new IntWritable(i));
+    writer.close();    
+  }
 
   private static RandomDatum[] generate(int count) {
     LOG.info("generating " + count + " records in memory");

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java Wed Oct 16 21:07:28 2013
@@ -19,11 +19,12 @@
 package org.apache.hadoop.io;
 
 import junit.framework.TestCase;
-
 import java.io.IOException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.Random;
+import com.google.common.primitives.Bytes;
 
 /** Unit tests for LargeUTF8. */
 public class TestText extends TestCase {
@@ -321,7 +322,81 @@ public class TestText extends TestCase {
       (new Text("foo"),
        "{\"type\":\"string\",\"java-class\":\"org.apache.hadoop.io.Text\"}");
   }
-
+  
+  /**
+   * 
+   */
+  public void testCharAt() {
+    String line = "adsawseeeeegqewgasddga";
+    Text text = new Text(line);
+    for (int i = 0; i < line.length(); i++) {
+      assertTrue("testCharAt error1 !!!", text.charAt(i) == line.charAt(i));
+    }    
+    assertEquals("testCharAt error2 !!!", -1, text.charAt(-1));    
+    assertEquals("testCharAt error3 !!!", -1, text.charAt(100));
+  }    
+  
+  /**
+   * test {@code Text} readFields/write operations
+   */
+  public void testReadWriteOperations() {
+    String line = "adsawseeeeegqewgasddga";
+    byte[] inputBytes = line.getBytes();       
+    inputBytes = Bytes.concat(new byte[] {(byte)22}, inputBytes);        
+    
+    DataInputBuffer in = new DataInputBuffer();
+    DataOutputBuffer out = new DataOutputBuffer();
+    Text text = new Text(line);
+    try {      
+      in.reset(inputBytes, inputBytes.length);
+      text.readFields(in);      
+    } catch(Exception ex) {
+      fail("testReadFields error !!!");
+    }    
+    try {
+      text.write(out);
+    } catch(IOException ex) {      
+    } catch(Exception ex) {
+      fail("testReadWriteOperations error !!!");
+    }        
+  }
+  
+  /**
+   * test {@code Text.bytesToCodePoint(bytes) } 
+   * with {@code BufferUnderflowException}
+   * 
+   */
+  public void testBytesToCodePoint() {
+    try {
+      ByteBuffer bytes = ByteBuffer.wrap(new byte[] {-2, 45, 23, 12, 76, 89});                                      
+      Text.bytesToCodePoint(bytes);      
+      assertTrue("testBytesToCodePoint error !!!", bytes.position() == 6 );                      
+    } catch (BufferUnderflowException ex) {
+      fail("testBytesToCodePoint unexp exception");
+    } catch (Exception e) {
+      fail("testBytesToCodePoint unexp exception");
+    }    
+  }
+  
+  public void testbytesToCodePointWithInvalidUTF() {
+    try {                 
+      Text.bytesToCodePoint(ByteBuffer.wrap(new byte[] {-2}));
+      fail("testbytesToCodePointWithInvalidUTF error unexp exception !!!");
+    } catch (BufferUnderflowException ex) {      
+    } catch(Exception e) {
+      fail("testbytesToCodePointWithInvalidUTF error unexp exception !!!");
+    }
+  }
+  
+  public void testUtf8Length() {         
+    assertEquals("testUtf8Length1 error   !!!", 1, Text.utf8Length(new String(new char[]{(char)1})));
+    assertEquals("testUtf8Length127 error !!!", 1, Text.utf8Length(new String(new char[]{(char)127})));
+    assertEquals("testUtf8Length128 error !!!", 2, Text.utf8Length(new String(new char[]{(char)128})));
+    assertEquals("testUtf8Length193 error !!!", 2, Text.utf8Length(new String(new char[]{(char)193})));    
+    assertEquals("testUtf8Length225 error !!!", 2, Text.utf8Length(new String(new char[]{(char)225})));
+    assertEquals("testUtf8Length254 error !!!", 2, Text.utf8Length(new String(new char[]{(char)254})));                 
+  }
+  
   public static void main(String[] args)  throws Exception
   {
     TestText test = new TestText("main");

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java Wed Oct 16 21:07:28 2013
@@ -16,11 +16,21 @@
  */
 package org.apache.hadoop.security;
 
-import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.TestSaslRPC;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
 import org.junit.*;
 
-import static org.mockito.Mockito.*;
-
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -30,21 +40,13 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
-import javax.security.auth.Subject;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.LoginContext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import static org.apache.hadoop.test.MetricsAsserts.*;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
-import org.apache.hadoop.util.Shell;
+import static org.apache.hadoop.ipc.TestSaslRPC.*;
+import static org.apache.hadoop.security.token.delegation.TestDelegationToken.TestDelegationTokenIdentifier;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestUserGroupInformation {
   final private static String USER_NAME = "user1@HADOOP.APACHE.ORG";
@@ -786,4 +788,29 @@ public class TestUserGroupInformation {
     UserGroupInformation.setLoginUser(ugi);
     assertEquals(ugi, UserGroupInformation.getLoginUser());
   }
+
+  /**
+   * In some scenario, such as HA, delegation tokens are associated with a
+   * logical name. The tokens are cloned and are associated with the
+   * physical address of the server where the service is provided.
+   * This test ensures cloned delegated tokens are locally used
+   * and are not returned in {@link UserGroupInformation#getCredentials()}
+   */
+  @Test
+  public void testPrivateTokenExclusion() throws Exception  {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    TestTokenIdentifier tokenId = new TestTokenIdentifier();
+    Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(
+            tokenId.getBytes(), "password".getBytes(),
+            tokenId.getKind(), null);
+    ugi.addToken(new Text("regular-token"), token);
+
+    // Now add cloned private token
+    ugi.addToken(new Text("private-token"), new Token.PrivateToken<TestTokenIdentifier>(token));
+    ugi.addToken(new Text("private-token1"), new Token.PrivateToken<TestTokenIdentifier>(token));
+
+    // Ensure only non-private tokens are returned
+    Collection<Token<? extends TokenIdentifier>> tokens = ugi.getCredentials().getAllTokens();
+    assertEquals(1, tokens.size());
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java Wed Oct 16 21:07:28 2013
@@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mount.MountdBase;
 import org.apache.hadoop.oncrpc.RpcProgram;
-import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
-import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
 import org.apache.hadoop.portmap.PortmapMapping;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 
 /**
  * Nfs server. Supports NFS v3 using {@link RpcProgram}.
@@ -72,19 +67,7 @@ public abstract class Nfs3Base {
 
   private void startTCPServer() {
     SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
-        rpcProgram, 0) {
-      @Override
-      public ChannelPipelineFactory getPipelineFactory() {
-        return new ChannelPipelineFactory() {
-          @Override
-          public ChannelPipeline getPipeline() {
-            return Channels.pipeline(
-                RpcUtil.constructRpcFrameDecoder(),
-                new SimpleTcpServerHandler(rpcProgram));
-          }
-        };
-      }
-    };
+        rpcProgram, 0);
     tcpServer.run();
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java Wed Oct 16 21:07:28 2013
@@ -97,6 +97,6 @@ public interface Nfs3Interface {
       InetAddress client);
 
   /** COMMIT: Commit cached data on a server to stable storage */
-  public NFS3Response commit(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response commit(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, InetAddress client);
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java Wed Oct 16 21:07:28 2013
@@ -28,8 +28,8 @@ import org.apache.hadoop.oncrpc.XDR;
  * WRITE3 Request
  */
 public class WRITE3Request extends RequestWithHandle {
-  private final long offset;
-  private final int count;
+  private long offset;
+  private int count;
   private final WriteStableHow stableHow;
   private final ByteBuffer data;
 
@@ -54,10 +54,18 @@ public class WRITE3Request extends Reque
     return this.offset;
   }
 
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+  
   public int getCount() {
     return this.count;
   }
 
+  public void setCount(int count) {
+    this.count = count;
+  }
+  
   public WriteStableHow getStableHow() {
     return this.stableHow;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java Wed Oct 16 21:07:28 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3St
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Verifier;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * READDIR3 Response
  */
@@ -49,7 +51,8 @@ public class READDIR3Response extends NF
       return fileId;
     }
 
-    String getName() {
+    @VisibleForTesting
+    public String getName() {
       return name;
     }
 
@@ -66,6 +69,11 @@ public class READDIR3Response extends NF
       this.entries = Collections.unmodifiableList(Arrays.asList(entries));
       this.eof = eof;
     }
+    
+    @VisibleForTesting
+    public List<Entry3> getEntries() {
+      return this.entries;
+    }
   }
 
   public READDIR3Response(int status) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java Wed Oct 16 21:07:28 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3St
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Verifier;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * READDIRPLUS3 Response
  */
@@ -51,6 +53,11 @@ public class READDIRPLUS3Response  exten
       this.objFileHandle = objFileHandle;
     }
 
+    @VisibleForTesting
+    public String getName() {
+      return name;
+    }
+    
     void seralize(XDR xdr) {
       xdr.writeLongAsHyper(fileId);
       xdr.writeString(name);
@@ -71,7 +78,8 @@ public class READDIRPLUS3Response  exten
       this.eof = eof;
     }
 
-    List<EntryPlus3> getEntries() {
+    @VisibleForTesting
+    public List<EntryPlus3> getEntries() {
       return entries;
     }
     
@@ -80,6 +88,11 @@ public class READDIRPLUS3Response  exten
     }
   }
 
+  @VisibleForTesting
+  public DirListPlus3 getDirListPlus() {
+    return dirListPlus;
+  }
+  
   public READDIRPLUS3Response(int status) {
     this(status, null, 0, null);
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Wed Oct 16 21:07:28 2013
@@ -44,7 +44,7 @@ import com.google.common.annotations.Vis
 public class RpcCallCache {
   
   public static class CacheEntry {
-    private XDR response; // null if no response has been sent
+    private RpcResponse response; // null if no response has been sent
     
     public CacheEntry() {
       response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
       return response != null;
     }
     
-    public XDR getResponse() {
+    public RpcResponse getResponse() {
       return response;
     }
     
-    public void setResponse(XDR response) {
+    public void setResponse(RpcResponse response) {
       this.response = response;
     }
   }
@@ -128,13 +128,13 @@ public class RpcCallCache {
   }
 
   /** Mark a request as completed and add corresponding response to the cache */
-  public void callCompleted(InetAddress clientId, int xid, XDR response) {
+  public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
     ClientRequest req = new ClientRequest(clientId, xid);
     CacheEntry e;
     synchronized(map) {
       e = map.get(req);
     }
-    e.setResponse(response);
+    e.response = response;
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Wed Oct 16 21:07:28 2013
@@ -18,22 +18,24 @@
 package org.apache.hadoop.oncrpc;
 
 import java.io.IOException;
-import java.net.InetAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
-import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
  * Class for writing RPC server programs based on RFC 1050. Extend this class
  * and implement {@link #handleInternal} to handle the requests received.
  */
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
   private static final Log LOG = LogFactory.getLog(RpcProgram.class);
   public static final int RPCB_PORT = 111;
   private final String program;
@@ -42,7 +44,6 @@ public abstract class RpcProgram {
   private final int progNumber;
   private final int lowProgVersion;
   private final int highProgVersion;
-  private final RpcCallCache rpcCallCache;
   
   /**
    * Constructor
@@ -53,19 +54,15 @@ public abstract class RpcProgram {
    * @param progNumber program number as defined in RFC 1050
    * @param lowProgVersion lowest version of the specification supported
    * @param highProgVersion highest version of the specification supported
-   * @param cacheSize size of cache to handle duplciate requests. Size <= 0
-   *          indicates no cache.
    */
   protected RpcProgram(String program, String host, int port, int progNumber,
-      int lowProgVersion, int highProgVersion, int cacheSize) {
+      int lowProgVersion, int highProgVersion) {
     this.program = program;
     this.host = host;
     this.port = port;
     this.progNumber = progNumber;
     this.lowProgVersion = lowProgVersion;
     this.highProgVersion = highProgVersion;
-    this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
-        : null;
   }
 
   /**
@@ -103,92 +100,50 @@ public abstract class RpcProgram {
     }
   }
 
-  /**
-   * Handle an RPC request.
-   * @param rpcCall RPC call that is received
-   * @param in xdr with cursor at reading the remaining bytes of a method call
-   * @param out xdr output corresponding to Rpc reply
-   * @param client making the Rpc request
-   * @param channel connection over which Rpc request is received
-   * @return response xdr response
-   */
-  protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel);
-  
-  public XDR handle(XDR xdr, InetAddress client, Channel channel) {
-    XDR out = new XDR();
-    RpcCall rpcCall = RpcCall.read(xdr);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(program + " procedure #" + rpcCall.getProcedure());
-    }
-    
-    if (!checkProgram(rpcCall.getProgram())) {
-      return programMismatch(out, rpcCall);
-    }
-
-    if (!checkProgramVersion(rpcCall.getVersion())) {
-      return programVersionMismatch(out, rpcCall);
-    }
-    
-    // Check for duplicate requests in the cache for non-idempotent requests
-    boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
-    if (idempotent) {
-      CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
-      if (entry != null) { // in ache 
-        if (entry.isCompleted()) {
-          LOG.info("Sending the cached reply to retransmitted request "
-              + rpcCall.getXid());
-          return entry.getResponse();
-        } else { // else request is in progress
-          LOG.info("Retransmitted request, transaction still in progress "
-              + rpcCall.getXid());
-          // TODO: ignore the request?
-        }
-      }
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    RpcInfo info = (RpcInfo) e.getMessage();
+    RpcCall call = (RpcCall) info.header();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(program + " procedure #" + call.getProcedure());
     }
     
-    XDR response = handleInternal(rpcCall, xdr, out, client, channel);
-    if (response.size() == 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No sync response, expect an async response for request XID="
-            + rpcCall.getXid());
-      }
+    if (this.progNumber != call.getProgram()) {
+      LOG.warn("Invalid RPC call program " + call.getProgram());
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
+    }
+
+    int ver = call.getVersion();
+    if (ver < lowProgVersion || ver > highProgVersion) {
+      LOG.warn("Invalid RPC call version " + ver);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      out.writeInt(lowProgVersion);
+      out.writeInt(highProgVersion);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
     }
     
-    // Add the request to the cache
-    if (idempotent) {
-      rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
-    }
-    return response;
-  }
-  
-  private XDR programMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call program " + call.getProgram());
-    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-        AcceptState.PROG_UNAVAIL, new VerifierNone());
-    reply.write(out);
-    return out;
-  }
-  
-  private XDR programVersionMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call version " + call.getVersion());
-    RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
-        AcceptState.PROG_MISMATCH, new VerifierNone());
-    reply.write(out);
-    out.writeInt(lowProgVersion);
-    out.writeInt(highProgVersion);
-    return out;
-  }
-  
-  private boolean checkProgram(int progNumber) {
-    return this.progNumber == progNumber;
-  }
-  
-  /** Return true if a the program version in rpcCall is supported */
-  private boolean checkProgramVersion(int programVersion) {
-    return programVersion >= lowProgVersion
-        && programVersion <= highProgVersion;
+    handleInternal(ctx, info);
   }
+
+  protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
   
   @Override
   public String toString() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Wed Oct 16 21:07:28 2013
@@ -17,17 +17,23 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
-public class RpcUtil {
+public final class RpcUtil {
   /**
-   * The XID in RPC call. It is used for starting with new seed after each reboot.
+   * The XID in RPC call. It is used for starting with new seed after each
+   * reboot.
    */
   private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
 
@@ -35,10 +41,27 @@ public class RpcUtil {
     return xid = ++xid + caller.hashCode();
   }
 
+  public static void sendRpcResponse(ChannelHandlerContext ctx,
+      RpcResponse response) {
+    Channels.fireMessageReceived(ctx, response);
+  }
+
   public static FrameDecoder constructRpcFrameDecoder() {
     return new RpcFrameDecoder();
   }
 
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+  /**
+   * An RPC client can separate a RPC message into several frames (i.e.,
+   * fragments) when transferring it across the wire. RpcFrameDecoder
+   * reconstructs a full RPC message from these fragments.
+   *
+   * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+   * each RPC client.
+   */
   static class RpcFrameDecoder extends FrameDecoder {
     public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
     private ChannelBuffer currentFrame;
@@ -78,4 +101,68 @@ public class RpcUtil {
       }
     }
   }
+
+  /**
+   * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+   * request into a RpcInfo instance.
+   */
+  static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+    private static final Log LOG = LogFactory
+        .getLog(RpcMessageParserStage.class);
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+      ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+      XDR in = new XDR(b, XDR.State.READING);
+
+      RpcInfo info = null;
+      try {
+        RpcCall callHeader = RpcCall.read(in);
+        ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+            .slice());
+        info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+            e.getRemoteAddress());
+      } catch (Exception exc) {
+        LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+      }
+
+      if (info != null) {
+        Channels.fireMessageReceived(ctx, info);
+      }
+    }
+  }
+
+  /**
+   * RpcTcpResponseStage sends an RpcResponse across the wire with the
+   * appropriate fragment header.
+   */
+  private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+      ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+      ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+      e.getChannel().write(d);
+    }
+  }
+
+  /**
+   * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+   * require a fragment header.
+   */
+  private static final class RpcUdpResponseStage extends
+      SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      e.getChannel().write(r.data(), r.remoteAddress());
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Wed Oct 16 21:07:28 2013
@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFa
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 /**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.ni
 public class SimpleTcpServer {
   public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   
   /** The maximum number of I/O worker threads */
   protected final int workerCount;
@@ -50,18 +50,6 @@ public class SimpleTcpServer {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workercount;
-    this.pipelineFactory = getPipelineFactory();
-  }
-
-  public ChannelPipelineFactory getPipelineFactory() {
-    return new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(
-            RpcUtil.constructRpcFrameDecoder(),
-            new SimpleTcpServerHandler(rpcProgram));
-      }
-    };
   }
   
   public void run() {
@@ -78,7 +66,15 @@ public class SimpleTcpServer {
     }
     
     ServerBootstrap bootstrap = new ServerBootstrap(factory);
-    bootstrap.setPipelineFactory(pipelineFactory);
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_TCP_RESPONSE);
+      }
+    });
     bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setOption("child.keepAlive", true);
     

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Wed Oct 16 21:07:28 2013
@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
   private final int RECEIVE_BUFFER_SIZE = 65536;
 
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   protected final int workerCount;
 
-  public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workerCount;
-    this.pipelineFactory = new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
-      }
-    };
   }
 
   public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
         Executors.newCachedThreadPool(), workerCount);
 
     ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
-    ChannelPipeline p = b.getPipeline();
-    p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+    b.setPipeline(Channels.pipeline(
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_UDP_RESPONSE));
 
     b.setOption("broadcast", "false");
     b.setOption("sendBufferSize", SEND_BUFFER_SIZE);

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java Wed Oct 16 21:07:28 2013
@@ -93,6 +93,10 @@ public final class XDR {
     return n;
   }
 
+  public ByteBuffer buffer() {
+    return buf.duplicate();
+  }
+
   public int size() {
     // TODO: This overloading intends to be compatible with the semantics of
     // the previous version of the class. This function should be separated into
@@ -219,7 +223,7 @@ public final class XDR {
     return xdr.buf.remaining() >= len;
   }
 
-  private static byte[] recordMark(int size, boolean last) {
+  static byte[] recordMark(int size, boolean last) {
     byte[] b = new byte[SIZEOF_INT];
     ByteBuffer buf = ByteBuffer.wrap(b);
     buf.putInt(!last ? size : size | 0x80000000);
@@ -259,9 +263,8 @@ public final class XDR {
 
   @VisibleForTesting
   public byte[] getBytes() {
-    ByteBuffer d = buf.duplicate();
-    byte[] b = new byte[d.position()];
-    d.flip();
+    ByteBuffer d = asReadOnlyWrap().buffer();
+    byte[] b = new byte[d.remaining()];
     d.get(b);
 
     return b;

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java Wed Oct 16 21:07:28 2013
@@ -18,16 +18,17 @@
 package org.apache.hadoop.oncrpc.security;
 
 import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
 
 /**
  * Base class for verifier. Currently our authentication only supports 3 types
- * of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS},
- * and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
+ * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS},
+ * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
  * AUTH_NONE and RPCSEC_GSS
  */
 public abstract class Verifier extends RpcAuthInfo {
 
+  public static final Verifier VERIFIER_NONE = new VerifierNone();
+
   protected Verifier(AuthFlavor flavor) {
     super(flavor);
   }
@@ -61,6 +62,4 @@ public abstract class Verifier extends R
     }
     verifier.write(xdr);
   }  
- 
-  
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Wed Oct 16 21:07:28 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.portmap;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -26,10 +25,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * An rpcbind request handler.
@@ -44,7 +48,7 @@ public class RpcProgramPortmap extends R
   private final HashMap<String, PortmapMapping> map;
 
   public RpcProgramPortmap() {
-    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
     map = new HashMap<String, PortmapMapping>(256);
   }
 
@@ -130,10 +134,15 @@ public class RpcProgramPortmap extends R
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR in = new XDR(data);
+    XDR out = new XDR();
+
     if (portmapProc == Procedure.PMAPPROC_NULL) {
       out = nullOp(xid, in, out);
     } else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -148,11 +157,14 @@ public class RpcProgramPortmap extends R
       out = getport(xid, in, out);
     } else {
       LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
-      RpcAcceptedReply.getInstance(xid,
-          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
-          out);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+      reply.write(out);
     }
-    return out;
+
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Wed Oct 16 21:07:28 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
@@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.junit.Test;
@@ -38,7 +38,7 @@ import org.mockito.Mockito;
 public class TestFrameDecoder {
 
   private static int port = 12345; // some random server port
-  private static XDR result = null;
+  private static int resultSize;
 
   static void testRequest(XDR request) {
     SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -49,18 +49,20 @@ public class TestFrameDecoder {
   static class TestRpcProgram extends RpcProgram {
 
     protected TestRpcProgram(String program, String host, int port,
-        int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
-      super(program, host, port, progNumber, lowProgVersion, highProgVersion,
-          cacheSize);
+        int progNumber, int lowProgVersion, int highProgVersion) {
+      super(program, host, port, progNumber, lowProgVersion, highProgVersion);
     }
 
     @Override
-    public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-        InetAddress client, Channel channel) {
-      // Get the final complete request and return a void response.
-      result = in;
-      RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out);
-      return out;
+    protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+      resultSize = info.data().readableBytes();
+      RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+          new VerifierNone());
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
     }
 
     @Override
@@ -147,21 +149,22 @@ public class TestFrameDecoder {
   public void testFrames() {
 
     RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
-        "localhost", port, 100000, 1, 2, 100);
+        "localhost", port, 100000, 1, 2);
     SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
     tcpServer.run();
 
     XDR xdrOut = createGetportMount();
+    int headerSize = xdrOut.size();
     int bufsize = 2 * 1024 * 1024;
     byte[] buffer = new byte[bufsize];
     xdrOut.writeFixedOpaque(buffer);
-    int requestSize = xdrOut.size();
+    int requestSize = xdrOut.size() - headerSize;
 
     // Send the request to the server
     testRequest(xdrOut);
 
     // Verify the server got the request with right size
-    assertTrue(requestSize == result.size());
+    assertEquals(requestSize, resultSize);
   }
 
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {
@@ -173,10 +176,6 @@ public class TestFrameDecoder {
   static XDR createGetportMount() {
     XDR xdr_out = new XDR();
     createPortmapXDRheader(xdr_out, 3);
-    xdr_out.writeInt(0); // AUTH_NULL
-    xdr_out.writeInt(0); // cred len
-    xdr_out.writeInt(0); // verifier AUTH_NULL
-    xdr_out.writeInt(0); // verf len
     return xdr_out;
   }
   /*

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java Wed Oct 16 21:07:28 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallC
 import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
 import org.junit.Test;
 
+import static org.mockito.Mockito.*;
+
 /**
  * Unit tests for {@link RpcCallCache}
  */
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
     validateInprogressCacheEntry(e);
     
     // Set call as completed
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     cache.callCompleted(clientIp, xid, response);
     e = cache.checkOrAddToCache(clientIp, xid);
     validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
     assertNull(c.getResponse());
   }
   
-  private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+  private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
     assertFalse(c.isInProgress());
     assertTrue(c.isCompleted());
     assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
     assertFalse(c.isCompleted());
     assertNull(c.getResponse());
     
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     c.setResponse(response);
     validateCompletedCacheEntry(c, response);
   }