You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2013/10/16 23:07:35 UTC
svn commit: r1532910 [3/3] - in
/hadoop/common/branches/HDFS-2832/hadoop-common-project: hadoop-common/
hadoop-common/src/main/bin/ hadoop-common/src/main/conf/
hadoop-common/src/main/docs/ hadoop-common/src/main/java/
hadoop-common/src/main/java/org/a...
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java Wed Oct 16 21:07:28 2013
@@ -17,29 +17,592 @@
*/
package org.apache.hadoop.io;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
-import junit.framework.TestCase;
+import static org.mockito.Mockito.*;
-public class TestMapFile extends TestCase {
+public class TestMapFile {
+
+ private static final Path TEST_DIR = new Path(
+ System.getProperty("test.build.data", "/tmp"),
+ TestMapFile.class.getSimpleName());
+
private static Configuration conf = new Configuration();
+ @Before
+ public void setup() throws Exception {
+ LocalFileSystem fs = FileSystem.getLocal(conf);
+ if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
+ Assert.fail("Can't clean up test root dir");
+ }
+ fs.mkdirs(TEST_DIR);
+ }
+
+ private static final Progressable defaultProgressable = new Progressable() {
+ @Override
+ public void progress() {
+ }
+ };
+
+ private static final CompressionCodec defaultCodec = new CompressionCodec() {
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ return null;
+ }
+
+ @Override
+ public Compressor createCompressor() {
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ return null;
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ return null;
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return null;
+ }
+ };
+
+ private MapFile.Writer createWriter(String fileName,
+ Class<? extends WritableComparable<?>> keyClass,
+ Class<? extends Writable> valueClass) throws IOException {
+ Path dirName = new Path(TEST_DIR, fileName);
+ MapFile.Writer.setIndexInterval(conf, 4);
+ return new MapFile.Writer(conf, dirName, MapFile.Writer.keyClass(keyClass),
+ MapFile.Writer.valueClass(valueClass));
+ }
+
+ private MapFile.Reader createReader(String fileName,
+ Class<? extends WritableComparable<?>> keyClass) throws IOException {
+ Path dirName = new Path(TEST_DIR, fileName);
+ return new MapFile.Reader(dirName, conf,
+ MapFile.Reader.comparator(new WritableComparator(keyClass)));
+ }
+
+ /**
+ * test {@code MapFile.Reader.getClosest()} method
+ *
+ */
+ @Test
+ public void testGetClosestOnCurrentApi() throws Exception {
+ final String TEST_PREFIX = "testGetClosestOnCurrentApi.mapfile";
+ MapFile.Writer writer = createWriter(TEST_PREFIX, Text.class, Text.class);
+ int FIRST_KEY = 1;
+ // Test keys: 11,21,31,...,91
+ for (int i = FIRST_KEY; i < 100; i += 10) {
+ Text t = new Text(Integer.toString(i));
+ writer.append(t, t);
+ }
+ writer.close();
+
+ MapFile.Reader reader = createReader(TEST_PREFIX, Text.class);
+ Text key = new Text("55");
+ Text value = new Text();
+
+ // Test get closest with step forward
+ Text closest = (Text) reader.getClosest(key, value);
+ assertEquals(new Text("61"), closest);
+
+ // Test get closest with step back
+ closest = (Text) reader.getClosest(key, value, true);
+ assertEquals(new Text("51"), closest);
+
+ // Test get closest when we pass explicit key
+ final Text explicitKey = new Text("21");
+ closest = (Text) reader.getClosest(explicitKey, value);
+ assertEquals(new Text("21"), explicitKey);
+
+ // Test what happens at boundaries. Assert if searching a key that is
+ // less than first key in the mapfile, that the first key is returned.
+ key = new Text("00");
+ closest = (Text) reader.getClosest(key, value);
+ assertEquals(FIRST_KEY, Integer.parseInt(closest.toString()));
+
+ // Assert that null is returned if key is > last entry in mapfile.
+ key = new Text("92");
+ closest = (Text) reader.getClosest(key, value);
+ assertNull("Not null key in testGetClosestWithNewCode", closest);
+
+ // If we were looking for the key before, we should get the last key
+ closest = (Text) reader.getClosest(key, value, true);
+ assertEquals(new Text("91"), closest);
+ }
+
+ /**
+ * test {@code MapFile.Reader.midKey() } method
+ */
+ @Test
+ public void testMidKeyOnCurrentApi() throws Exception {
+ // Write a mapfile of simple data: keys are
+ final String TEST_PREFIX = "testMidKeyOnCurrentApi.mapfile";
+ MapFile.Writer writer = createWriter(TEST_PREFIX, IntWritable.class,
+ IntWritable.class);
+ // 0,1,....9
+ int SIZE = 10;
+ for (int i = 0; i < SIZE; i++)
+ writer.append(new IntWritable(i), new IntWritable(i));
+ writer.close();
+
+ MapFile.Reader reader = createReader(TEST_PREFIX, IntWritable.class);
+ assertEquals(new IntWritable((SIZE - 1) / 2), reader.midKey());
+ }
+
+ /**
+ * test {@code MapFile.Writer.rename()} method
+ */
+ @Test
+ public void testRename() {
+ final String NEW_FILE_NAME = "test-new.mapfile";
+ final String OLD_FILE_NAME = "test-old.mapfile";
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ MapFile.Writer writer = createWriter(OLD_FILE_NAME, IntWritable.class,
+ IntWritable.class);
+ writer.close();
+ MapFile.rename(fs, new Path(TEST_DIR, OLD_FILE_NAME).toString(),
+ new Path(TEST_DIR, NEW_FILE_NAME).toString());
+ MapFile.delete(fs, new Path(TEST_DIR, NEW_FILE_NAME).toString());
+ } catch (IOException ex) {
+ fail("testRename error " + ex);
+ }
+ }
+
+ /**
+ * test {@code MapFile.rename()}
+ * method with throwing {@code IOException}
+ */
+ @Test
+ public void testRenameWithException() {
+ final String ERROR_MESSAGE = "Can't rename file";
+ final String NEW_FILE_NAME = "test-new.mapfile";
+ final String OLD_FILE_NAME = "test-old.mapfile";
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileSystem spyFs = spy(fs);
+
+ MapFile.Writer writer = createWriter(OLD_FILE_NAME, IntWritable.class,
+ IntWritable.class);
+ writer.close();
+
+ Path oldDir = new Path(TEST_DIR, OLD_FILE_NAME);
+ Path newDir = new Path(TEST_DIR, NEW_FILE_NAME);
+ when(spyFs.rename(oldDir, newDir)).thenThrow(
+ new IOException(ERROR_MESSAGE));
+
+ MapFile.rename(spyFs, oldDir.toString(), newDir.toString());
+ fail("testRenameWithException no exception error !!!");
+ } catch (IOException ex) {
+ assertEquals("testRenameWithException invalid IOExceptionMessage !!!",
+ ex.getMessage(), ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testRenameWithFalse() {
+ final String ERROR_MESSAGE = "Could not rename";
+ final String NEW_FILE_NAME = "test-new.mapfile";
+ final String OLD_FILE_NAME = "test-old.mapfile";
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileSystem spyFs = spy(fs);
+
+ MapFile.Writer writer = createWriter(OLD_FILE_NAME, IntWritable.class,
+ IntWritable.class);
+ writer.close();
+
+ Path oldDir = new Path(TEST_DIR, OLD_FILE_NAME);
+ Path newDir = new Path(TEST_DIR, NEW_FILE_NAME);
+ when(spyFs.rename(oldDir, newDir)).thenReturn(false);
+
+ MapFile.rename(spyFs, oldDir.toString(), newDir.toString());
+ fail("testRenameWithException no exception error !!!");
+ } catch (IOException ex) {
+ assertTrue("testRenameWithFalse invalid IOExceptionMessage error !!!", ex
+ .getMessage().startsWith(ERROR_MESSAGE));
+ }
+ }
+
+ /**
+ * test throwing {@code IOException} in {@code MapFile.Writer} constructor
+ */
+ @Test
+ public void testWriteWithFailDirCreation() {
+ String ERROR_MESSAGE = "Mkdirs failed to create directory";
+ Path dirName = new Path(TEST_DIR, "fail.mapfile");
+ MapFile.Writer writer = null;
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileSystem spyFs = spy(fs);
+ Path pathSpy = spy(dirName);
+ when(pathSpy.getFileSystem(conf)).thenReturn(spyFs);
+ when(spyFs.mkdirs(dirName)).thenReturn(false);
+
+ writer = new MapFile.Writer(conf, pathSpy,
+ MapFile.Writer.keyClass(IntWritable.class),
+ MapFile.Writer.valueClass(Text.class));
+ fail("testWriteWithFailDirCreation error !!!");
+ } catch (IOException ex) {
+ assertTrue("testWriteWithFailDirCreation ex error !!!", ex.getMessage()
+ .startsWith(ERROR_MESSAGE));
+ } finally {
+ if (writer != null)
+ try {
+ writer.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * test {@code MapFile.Reader.finalKey()} method
+ */
+ @Test
+ public void testOnFinalKey() {
+ final String TEST_METHOD_KEY = "testOnFinalKey.mapfile";
+ int SIZE = 10;
+ try {
+ MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+ IntWritable.class);
+ for (int i = 0; i < SIZE; i++)
+ writer.append(new IntWritable(i), new IntWritable(i));
+ writer.close();
+
+ MapFile.Reader reader = createReader(TEST_METHOD_KEY, IntWritable.class);
+ IntWritable expectedKey = new IntWritable(0);
+ reader.finalKey(expectedKey);
+ assertEquals("testOnFinalKey not same !!!", expectedKey, new IntWritable(
+ 9));
+ } catch (IOException ex) {
+ fail("testOnFinalKey error !!!");
+ }
+ }
+
+ /**
+ * test {@code MapFile.Writer} constructor with key, value
+ * and validate it with {@code keyClass(), valueClass()} methods
+ */
+ @Test
+ public void testKeyValueClasses() {
+ Class<? extends WritableComparable<?>> keyClass = IntWritable.class;
+ Class<?> valueClass = Text.class;
+ try {
+ createWriter("testKeyValueClasses.mapfile", IntWritable.class, Text.class);
+ assertNotNull("writer key class null error !!!",
+ MapFile.Writer.keyClass(keyClass));
+ assertNotNull("writer value class null error !!!",
+ MapFile.Writer.valueClass(valueClass));
+ } catch (IOException ex) {
+ fail(ex.getMessage());
+ }
+ }
+
+ /**
+ * test {@code MapFile.Reader.getClosest() } with wrong class key
+ */
+ @Test
+ public void testReaderGetClosest() throws Exception {
+ final String TEST_METHOD_KEY = "testReaderWithWrongKeyClass.mapfile";
+ try {
+ MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+ Text.class);
+
+ for (int i = 0; i < 10; i++)
+ writer.append(new IntWritable(i), new Text("value" + i));
+ writer.close();
+
+ MapFile.Reader reader = createReader(TEST_METHOD_KEY, Text.class);
+ reader.getClosest(new Text("2"), new Text(""));
+ fail("no excepted exception in testReaderWithWrongKeyClass !!!");
+ } catch (IOException ex) {
+ /* Should be thrown to pass the test */
+ }
+ }
+
+ /**
+ * test {@code MapFile.Writer.append() } with wrong key class
+ */
+ @Test
+ public void testReaderWithWrongValueClass() {
+ final String TEST_METHOD_KEY = "testReaderWithWrongValueClass.mapfile";
+ try {
+ MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+ Text.class);
+ writer.append(new IntWritable(0), new IntWritable(0));
+ fail("no excepted exception in testReaderWithWrongKeyClass !!!");
+ } catch (IOException ex) {
+ /* Should be thrown to pass the test */
+ }
+ }
+
+ /**
+ * test {@code MapFile.Reader.next(key, value)} for iteration.
+ */
+ @Test
+ public void testReaderKeyIteration() {
+ final String TEST_METHOD_KEY = "testReaderKeyIteration.mapfile";
+ int SIZE = 10;
+ int ITERATIONS = 5;
+ try {
+ MapFile.Writer writer = createWriter(TEST_METHOD_KEY, IntWritable.class,
+ Text.class);
+ int start = 0;
+ for (int i = 0; i < SIZE; i++)
+ writer.append(new IntWritable(i), new Text("Value:" + i));
+ writer.close();
+
+ MapFile.Reader reader = createReader(TEST_METHOD_KEY, IntWritable.class);
+ // test iteration
+ Writable startValue = new Text("Value:" + start);
+ int i = 0;
+ while (i++ < ITERATIONS) {
+ IntWritable key = new IntWritable(start);
+ Writable value = startValue;
+ while (reader.next(key, value)) {
+ assertNotNull(key);
+ assertNotNull(value);
+ }
+ reader.reset();
+ }
+ assertTrue("reader seek error !!!",
+ reader.seek(new IntWritable(SIZE / 2)));
+ assertFalse("reader seek error !!!",
+ reader.seek(new IntWritable(SIZE * 2)));
+ } catch (IOException ex) {
+ fail("reader seek error !!!");
+ }
+ }
+
+ /**
+ * test {@code MapFile.Writer.testFix} method
+ */
+ @Test
+ public void testFix() {
+ final String INDEX_LESS_MAP_FILE = "testFix.mapfile";
+ int PAIR_SIZE = 20;
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(TEST_DIR, INDEX_LESS_MAP_FILE);
+ MapFile.Writer writer = createWriter(INDEX_LESS_MAP_FILE,
+ IntWritable.class, Text.class);
+ for (int i = 0; i < PAIR_SIZE; i++)
+ writer.append(new IntWritable(0), new Text("value"));
+ writer.close();
+
+ File indexFile = new File(".", "." + INDEX_LESS_MAP_FILE + "/index");
+ boolean isDeleted = false;
+ if (indexFile.exists())
+ isDeleted = indexFile.delete();
+
+ if (isDeleted)
+ assertTrue("testFix error !!!",
+ MapFile.fix(fs, dir, IntWritable.class, Text.class, true, conf) == PAIR_SIZE);
+ } catch (Exception ex) {
+ fail("testFix error !!!");
+ }
+ }
+ /**
+ * test all available constructor for {@code MapFile.Writer}
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDeprecatedConstructors() {
+ String path = new Path(TEST_DIR, "writes.mapfile").toString();
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ MapFile.Writer writer = new MapFile.Writer(conf, fs, path,
+ IntWritable.class, Text.class, CompressionType.RECORD);
+ assertNotNull(writer);
+ writer = new MapFile.Writer(conf, fs, path, IntWritable.class,
+ Text.class, CompressionType.RECORD, defaultProgressable);
+ assertNotNull(writer);
+ writer = new MapFile.Writer(conf, fs, path, IntWritable.class,
+ Text.class, CompressionType.RECORD, defaultCodec, defaultProgressable);
+ assertNotNull(writer);
+ writer = new MapFile.Writer(conf, fs, path,
+ WritableComparator.get(Text.class), Text.class);
+ assertNotNull(writer);
+ writer = new MapFile.Writer(conf, fs, path,
+ WritableComparator.get(Text.class), Text.class,
+ SequenceFile.CompressionType.RECORD);
+ assertNotNull(writer);
+ writer = new MapFile.Writer(conf, fs, path,
+ WritableComparator.get(Text.class), Text.class,
+ CompressionType.RECORD, defaultProgressable);
+ assertNotNull(writer);
+ writer.close();
+
+ MapFile.Reader reader = new MapFile.Reader(fs, path,
+ WritableComparator.get(IntWritable.class), conf);
+ assertNotNull(reader);
+ assertNotNull("reader key is null !!!", reader.getKeyClass());
+ assertNotNull("reader value in null", reader.getValueClass());
+
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * test {@code MapFile.Writer} constructor
+ * with IllegalArgumentException
+ *
+ */
+ @Test
+ public void testKeyLessWriterCreation() {
+ MapFile.Writer writer = null;
+ try {
+ writer = new MapFile.Writer(conf, TEST_DIR);
+ fail("fail in testKeyLessWriterCreation !!!");
+ } catch (IllegalArgumentException ex) {
+ } catch (Exception e) {
+ fail("fail in testKeyLessWriterCreation. Other ex !!!");
+ } finally {
+ if (writer != null)
+ try {
+ writer.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ /**
+ * test {@code MapFile.Writer} constructor with IOException
+ */
+ @Test
+ public void testPathExplosionWriterCreation() {
+ Path path = new Path(TEST_DIR, "testPathExplosionWriterCreation.mapfile");
+ String TEST_ERROR_MESSAGE = "Mkdirs failed to create directory "
+ + path.getName();
+ MapFile.Writer writer = null;
+ try {
+ FileSystem fsSpy = spy(FileSystem.get(conf));
+ Path pathSpy = spy(path);
+ when(fsSpy.mkdirs(path)).thenThrow(new IOException(TEST_ERROR_MESSAGE));
+
+ when(pathSpy.getFileSystem(conf)).thenReturn(fsSpy);
+
+ writer = new MapFile.Writer(conf, pathSpy,
+ MapFile.Writer.keyClass(IntWritable.class),
+ MapFile.Writer.valueClass(IntWritable.class));
+ fail("fail in testPathExplosionWriterCreation !!!");
+ } catch (IOException ex) {
+ assertEquals("testPathExplosionWriterCreation ex message error !!!",
+ ex.getMessage(), TEST_ERROR_MESSAGE);
+ } catch (Exception e) {
+ fail("fail in testPathExplosionWriterCreation. Other ex !!!");
+ } finally {
+ if (writer != null)
+ try {
+ writer.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * test {@code MapFile.Writer.append} method with desc order
+ */
+ @Test
+ public void testDescOrderWithThrowExceptionWriterAppend() {
+ try {
+ MapFile.Writer writer = createWriter(".mapfile", IntWritable.class,
+ Text.class);
+ writer.append(new IntWritable(2), new Text("value: " + 1));
+ writer.append(new IntWritable(2), new Text("value: " + 2));
+ writer.append(new IntWritable(2), new Text("value: " + 4));
+ writer.append(new IntWritable(1), new Text("value: " + 3));
+ fail("testDescOrderWithThrowExceptionWriterAppend not expected exception error !!!");
+ } catch (IOException ex) {
+ } catch (Exception e) {
+ fail("testDescOrderWithThrowExceptionWriterAppend other ex throw !!!");
+ }
+ }
+
+ @Test
+ public void testMainMethodMapFile() {
+ String path = new Path(TEST_DIR, "mainMethodMapFile.mapfile").toString();
+ String inFile = "mainMethodMapFile.mapfile";
+ String outFile = "mainMethodMapFile.mapfile";
+ String[] args = { path, outFile };
+ try {
+ MapFile.Writer writer = createWriter(inFile, IntWritable.class,
+ Text.class);
+ writer.append(new IntWritable(1), new Text("test_text1"));
+ writer.append(new IntWritable(2), new Text("test_text2"));
+ writer.close();
+ MapFile.main(args);
+ } catch (Exception ex) {
+ fail("testMainMethodMapFile error !!!");
+ }
+ }
+
/**
* Test getClosest feature.
+ *
* @throws Exception
*/
+ @Test
+ @SuppressWarnings("deprecation")
public void testGetClosest() throws Exception {
- // Write a mapfile of simple data: keys are
- Path dirName = new Path(System.getProperty("test.build.data",".") +
- getName() + ".mapfile");
+ // Write a mapfile of simple data: keys are
+ Path dirName = new Path(TEST_DIR, "testGetClosest.mapfile");
FileSystem fs = FileSystem.getLocal(conf);
Path qualifiedDirName = fs.makeQualified(dirName);
// Make an index entry for every third insertion.
MapFile.Writer.setIndexInterval(conf, 3);
MapFile.Writer writer = new MapFile.Writer(conf, fs,
- qualifiedDirName.toString(), Text.class, Text.class);
+ qualifiedDirName.toString(), Text.class, Text.class);
// Assert that the index interval is 1
assertEquals(3, writer.getIndexInterval());
// Add entries up to 100 in intervals of ten.
@@ -51,74 +614,84 @@ public class TestMapFile extends TestCas
}
writer.close();
// Now do getClosest on created mapfile.
- MapFile.Reader reader = new MapFile.Reader(fs, qualifiedDirName.toString(),
- conf);
+ MapFile.Reader reader = new MapFile.Reader(qualifiedDirName, conf);
+ try {
Text key = new Text("55");
Text value = new Text();
- Text closest = (Text)reader.getClosest(key, value);
+ Text closest = (Text) reader.getClosest(key, value);
// Assert that closest after 55 is 60
assertEquals(new Text("60"), closest);
// Get closest that falls before the passed key: 50
- closest = (Text)reader.getClosest(key, value, true);
+ closest = (Text) reader.getClosest(key, value, true);
assertEquals(new Text("50"), closest);
// Test get closest when we pass explicit key
final Text TWENTY = new Text("20");
- closest = (Text)reader.getClosest(TWENTY, value);
+ closest = (Text) reader.getClosest(TWENTY, value);
assertEquals(TWENTY, closest);
- closest = (Text)reader.getClosest(TWENTY, value, true);
+ closest = (Text) reader.getClosest(TWENTY, value, true);
assertEquals(TWENTY, closest);
- // Test what happens at boundaries. Assert if searching a key that is
+ // Test what happens at boundaries. Assert if searching a key that is
// less than first key in the mapfile, that the first key is returned.
key = new Text("00");
- closest = (Text)reader.getClosest(key, value);
+ closest = (Text) reader.getClosest(key, value);
assertEquals(FIRST_KEY, Integer.parseInt(closest.toString()));
-
- // If we're looking for the first key before, and we pass in a key before
+
+ // If we're looking for the first key before, and we pass in a key before
// the first key in the file, we should get null
- closest = (Text)reader.getClosest(key, value, true);
+ closest = (Text) reader.getClosest(key, value, true);
assertNull(closest);
-
+
// Assert that null is returned if key is > last entry in mapfile.
key = new Text("99");
- closest = (Text)reader.getClosest(key, value);
+ closest = (Text) reader.getClosest(key, value);
assertNull(closest);
// If we were looking for the key before, we should get the last key
- closest = (Text)reader.getClosest(key, value, true);
+ closest = (Text) reader.getClosest(key, value, true);
assertEquals(new Text("90"), closest);
+ } finally {
+ reader.close();
+ }
}
+ @Test
+ @SuppressWarnings("deprecation")
public void testMidKey() throws Exception {
- // Write a mapfile of simple data: keys are
- Path dirName = new Path(System.getProperty("test.build.data",".") +
- getName() + ".mapfile");
+ // Write a mapfile of simple data: keys are
+ Path dirName = new Path(TEST_DIR, "testMidKey.mapfile");
FileSystem fs = FileSystem.getLocal(conf);
Path qualifiedDirName = fs.makeQualified(dirName);
-
+
MapFile.Writer writer = new MapFile.Writer(conf, fs,
- qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
+ qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
writer.append(new IntWritable(1), new IntWritable(1));
writer.close();
// Now do getClosest on created mapfile.
- MapFile.Reader reader = new MapFile.Reader(fs, qualifiedDirName.toString(),
- conf);
- assertEquals(new IntWritable(1), reader.midKey());
+ MapFile.Reader reader = new MapFile.Reader(qualifiedDirName, conf);
+ try {
+ assertEquals(new IntWritable(1), reader.midKey());
+ } finally {
+ reader.close();
+ }
}
-
+ @Test
+ @SuppressWarnings("deprecation")
public void testMidKeyEmpty() throws Exception {
- // Write a mapfile of simple data: keys are
- Path dirName = new Path(System.getProperty("test.build.data",".") +
- getName() + ".mapfile");
+ // Write a mapfile of simple data: keys are
+ Path dirName = new Path(TEST_DIR, "testMidKeyEmpty.mapfile");
FileSystem fs = FileSystem.getLocal(conf);
Path qualifiedDirName = fs.makeQualified(dirName);
-
+
MapFile.Writer writer = new MapFile.Writer(conf, fs,
- qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
+ qualifiedDirName.toString(), IntWritable.class, IntWritable.class);
writer.close();
// Now do getClosest on created mapfile.
- MapFile.Reader reader = new MapFile.Reader(fs, qualifiedDirName.toString(),
- conf);
- assertEquals(null, reader.midKey());
+ MapFile.Reader reader = new MapFile.Reader(qualifiedDirName, conf);
+ try {
+ assertEquals(null, reader.midKey());
+ } finally {
+ reader.close();
+ }
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSetFile.java Wed Oct 16 21:07:28 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.io;
import java.io.*;
import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
import junit.framework.TestCase;
import org.apache.commons.logging.*;
@@ -51,6 +53,39 @@ public class TestSetFile extends TestCas
fs.close();
}
}
+
+ /**
+ * test {@code SetFile.Reader} methods
+ * next(), get() in combination
+ */
+ public void testSetFileAccessMethods() {
+ try {
+ FileSystem fs = FileSystem.getLocal(conf);
+ int size = 10;
+ writeData(fs, size);
+ SetFile.Reader reader = createReader(fs);
+ assertTrue("testSetFileWithConstruction1 error !!!", reader.next(new IntWritable(0)));
+ // don't know why reader.get(i) return i+1
+ assertEquals("testSetFileWithConstruction2 error !!!", new IntWritable(size/2 + 1), reader.get(new IntWritable(size/2)));
+ assertNull("testSetFileWithConstruction3 error !!!", reader.get(new IntWritable(size*2)));
+ } catch (Exception ex) {
+ fail("testSetFileWithConstruction error !!!");
+ }
+ }
+
+ private SetFile.Reader createReader(FileSystem fs) throws IOException {
+ return new SetFile.Reader(fs, FILE,
+ WritableComparator.get(IntWritable.class), conf);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void writeData(FileSystem fs, int elementSize) throws IOException {
+ MapFile.delete(fs, FILE);
+ SetFile.Writer writer = new SetFile.Writer(fs, FILE, IntWritable.class);
+ for (int i = 0; i < elementSize; i++)
+ writer.append(new IntWritable(i));
+ writer.close();
+ }
private static RandomDatum[] generate(int count) {
LOG.info("generating " + count + " records in memory");
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java Wed Oct 16 21:07:28 2013
@@ -19,11 +19,12 @@
package org.apache.hadoop.io;
import junit.framework.TestCase;
-
import java.io.IOException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Random;
+import com.google.common.primitives.Bytes;
/** Unit tests for LargeUTF8. */
public class TestText extends TestCase {
@@ -321,7 +322,81 @@ public class TestText extends TestCase {
(new Text("foo"),
"{\"type\":\"string\",\"java-class\":\"org.apache.hadoop.io.Text\"}");
}
-
+
+ /**
+ *
+ */
+ public void testCharAt() {
+ String line = "adsawseeeeegqewgasddga";
+ Text text = new Text(line);
+ for (int i = 0; i < line.length(); i++) {
+ assertTrue("testCharAt error1 !!!", text.charAt(i) == line.charAt(i));
+ }
+ assertEquals("testCharAt error2 !!!", -1, text.charAt(-1));
+ assertEquals("testCharAt error3 !!!", -1, text.charAt(100));
+ }
+
+ /**
+ * test {@code Text} readFields/write operations
+ */
+ public void testReadWriteOperations() {
+ String line = "adsawseeeeegqewgasddga";
+ byte[] inputBytes = line.getBytes();
+ inputBytes = Bytes.concat(new byte[] {(byte)22}, inputBytes);
+
+ DataInputBuffer in = new DataInputBuffer();
+ DataOutputBuffer out = new DataOutputBuffer();
+ Text text = new Text(line);
+ try {
+ in.reset(inputBytes, inputBytes.length);
+ text.readFields(in);
+ } catch(Exception ex) {
+ fail("testReadFields error !!!");
+ }
+ try {
+ text.write(out);
+ } catch(IOException ex) {
+ } catch(Exception ex) {
+ fail("testReadWriteOperations error !!!");
+ }
+ }
+
+ /**
+ * test {@code Text.bytesToCodePoint(bytes) }
+ * with {@code BufferUnderflowException}
+ *
+ */
+ public void testBytesToCodePoint() {
+ try {
+ ByteBuffer bytes = ByteBuffer.wrap(new byte[] {-2, 45, 23, 12, 76, 89});
+ Text.bytesToCodePoint(bytes);
+ assertTrue("testBytesToCodePoint error !!!", bytes.position() == 6 );
+ } catch (BufferUnderflowException ex) {
+ fail("testBytesToCodePoint unexp exception");
+ } catch (Exception e) {
+ fail("testBytesToCodePoint unexp exception");
+ }
+ }
+
+ public void testbytesToCodePointWithInvalidUTF() {
+ try {
+ Text.bytesToCodePoint(ByteBuffer.wrap(new byte[] {-2}));
+ fail("testbytesToCodePointWithInvalidUTF error unexp exception !!!");
+ } catch (BufferUnderflowException ex) {
+ } catch(Exception e) {
+ fail("testbytesToCodePointWithInvalidUTF error unexp exception !!!");
+ }
+ }
+
+ public void testUtf8Length() {
+ assertEquals("testUtf8Length1 error !!!", 1, Text.utf8Length(new String(new char[]{(char)1})));
+ assertEquals("testUtf8Length127 error !!!", 1, Text.utf8Length(new String(new char[]{(char)127})));
+ assertEquals("testUtf8Length128 error !!!", 2, Text.utf8Length(new String(new char[]{(char)128})));
+ assertEquals("testUtf8Length193 error !!!", 2, Text.utf8Length(new String(new char[]{(char)193})));
+ assertEquals("testUtf8Length225 error !!!", 2, Text.utf8Length(new String(new char[]{(char)225})));
+ assertEquals("testUtf8Length254 error !!!", 2, Text.utf8Length(new String(new char[]{(char)254})));
+ }
+
public static void main(String[] args) throws Exception
{
TestText test = new TestText("main");
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java Wed Oct 16 21:07:28 2013
@@ -16,11 +16,21 @@
*/
package org.apache.hadoop.security;
-import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.TestSaslRPC;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
import org.junit.*;
-import static org.mockito.Mockito.*;
-
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -30,21 +40,13 @@ import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
-import javax.security.auth.Subject;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.LoginContext;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import static org.apache.hadoop.test.MetricsAsserts.*;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL;
-import org.apache.hadoop.util.Shell;
+import static org.apache.hadoop.ipc.TestSaslRPC.*;
+import static org.apache.hadoop.security.token.delegation.TestDelegationToken.TestDelegationTokenIdentifier;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestUserGroupInformation {
final private static String USER_NAME = "user1@HADOOP.APACHE.ORG";
@@ -786,4 +788,29 @@ public class TestUserGroupInformation {
UserGroupInformation.setLoginUser(ugi);
assertEquals(ugi, UserGroupInformation.getLoginUser());
}
+
+ /**
+ * In some scenario, such as HA, delegation tokens are associated with a
+ * logical name. The tokens are cloned and are associated with the
+ * physical address of the server where the service is provided.
+ * This test ensures cloned delegated tokens are locally used
+ * and are not returned in {@link UserGroupInformation#getCredentials()}
+ */
+ @Test
+ public void testPrivateTokenExclusion() throws Exception {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ TestTokenIdentifier tokenId = new TestTokenIdentifier();
+ Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(
+ tokenId.getBytes(), "password".getBytes(),
+ tokenId.getKind(), null);
+ ugi.addToken(new Text("regular-token"), token);
+
+ // Now add cloned private token
+ ugi.addToken(new Text("private-token"), new Token.PrivateToken<TestTokenIdentifier>(token));
+ ugi.addToken(new Text("private-token1"), new Token.PrivateToken<TestTokenIdentifier>(token));
+
+ // Ensure only non-private tokens are returned
+ Collection<Token<? extends TokenIdentifier>> tokens = ugi.getCredentials().getAllTokens();
+ assertEquals(1, tokens.size());
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java Wed Oct 16 21:07:28 2013
@@ -22,13 +22,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.oncrpc.RpcProgram;
-import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.SimpleTcpServer;
-import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
import org.apache.hadoop.portmap.PortmapMapping;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
/**
* Nfs server. Supports NFS v3 using {@link RpcProgram}.
@@ -72,19 +67,7 @@ public abstract class Nfs3Base {
private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(nfsPort,
- rpcProgram, 0) {
- @Override
- public ChannelPipelineFactory getPipelineFactory() {
- return new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
- RpcUtil.constructRpcFrameDecoder(),
- new SimpleTcpServerHandler(rpcProgram));
- }
- };
- }
- };
+ rpcProgram, 0);
tcpServer.run();
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java Wed Oct 16 21:07:28 2013
@@ -97,6 +97,6 @@ public interface Nfs3Interface {
InetAddress client);
/** COMMIT: Commit cached data on a server to stable storage */
- public NFS3Response commit(XDR xdr, SecurityHandler securityHandler,
- InetAddress client);
+ public NFS3Response commit(XDR xdr, Channel channel, int xid,
+ SecurityHandler securityHandler, InetAddress client);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java Wed Oct 16 21:07:28 2013
@@ -28,8 +28,8 @@ import org.apache.hadoop.oncrpc.XDR;
* WRITE3 Request
*/
public class WRITE3Request extends RequestWithHandle {
- private final long offset;
- private final int count;
+ private long offset;
+ private int count;
private final WriteStableHow stableHow;
private final ByteBuffer data;
@@ -54,10 +54,18 @@ public class WRITE3Request extends Reque
return this.offset;
}
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
public int getCount() {
return this.count;
}
+ public void setCount(int count) {
+ this.count = count;
+ }
+
public WriteStableHow getStableHow() {
return this.stableHow;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIR3Response.java Wed Oct 16 21:07:28 2013
@@ -26,6 +26,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3St
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Verifier;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* READDIR3 Response
*/
@@ -49,7 +51,8 @@ public class READDIR3Response extends NF
return fileId;
}
- String getName() {
+ @VisibleForTesting
+ public String getName() {
return name;
}
@@ -66,6 +69,11 @@ public class READDIR3Response extends NF
this.entries = Collections.unmodifiableList(Arrays.asList(entries));
this.eof = eof;
}
+
+ @VisibleForTesting
+ public List<Entry3> getEntries() {
+ return this.entries;
+ }
}
public READDIR3Response(int status) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/response/READDIRPLUS3Response.java Wed Oct 16 21:07:28 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.Nfs3St
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Verifier;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* READDIRPLUS3 Response
*/
@@ -51,6 +53,11 @@ public class READDIRPLUS3Response exten
this.objFileHandle = objFileHandle;
}
+ @VisibleForTesting
+ public String getName() {
+ return name;
+ }
+
void seralize(XDR xdr) {
xdr.writeLongAsHyper(fileId);
xdr.writeString(name);
@@ -71,7 +78,8 @@ public class READDIRPLUS3Response exten
this.eof = eof;
}
- List<EntryPlus3> getEntries() {
+ @VisibleForTesting
+ public List<EntryPlus3> getEntries() {
return entries;
}
@@ -80,6 +88,11 @@ public class READDIRPLUS3Response exten
}
}
+ @VisibleForTesting
+ public DirListPlus3 getDirListPlus() {
+ return dirListPlus;
+ }
+
public READDIRPLUS3Response(int status) {
this(status, null, 0, null);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Wed Oct 16 21:07:28 2013
@@ -44,7 +44,7 @@ import com.google.common.annotations.Vis
public class RpcCallCache {
public static class CacheEntry {
- private XDR response; // null if no response has been sent
+ private RpcResponse response; // null if no response has been sent
public CacheEntry() {
response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
return response != null;
}
- public XDR getResponse() {
+ public RpcResponse getResponse() {
return response;
}
- public void setResponse(XDR response) {
+ public void setResponse(RpcResponse response) {
this.response = response;
}
}
@@ -128,13 +128,13 @@ public class RpcCallCache {
}
/** Mark a request as completed and add corresponding response to the cache */
- public void callCompleted(InetAddress clientId, int xid, XDR response) {
+ public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
ClientRequest req = new ClientRequest(clientId, xid);
CacheEntry e;
synchronized(map) {
e = map.get(req);
}
- e.setResponse(response);
+ e.response = response;
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Wed Oct 16 21:07:28 2013
@@ -18,22 +18,24 @@
package org.apache.hadoop.oncrpc;
import java.io.IOException;
-import java.net.InetAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
-import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.oncrpc.security.Verifier;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
/**
* Class for writing RPC server programs based on RFC 1050. Extend this class
* and implement {@link #handleInternal} to handle the requests received.
*/
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
private static final Log LOG = LogFactory.getLog(RpcProgram.class);
public static final int RPCB_PORT = 111;
private final String program;
@@ -42,7 +44,6 @@ public abstract class RpcProgram {
private final int progNumber;
private final int lowProgVersion;
private final int highProgVersion;
- private final RpcCallCache rpcCallCache;
/**
* Constructor
@@ -53,19 +54,15 @@ public abstract class RpcProgram {
* @param progNumber program number as defined in RFC 1050
* @param lowProgVersion lowest version of the specification supported
* @param highProgVersion highest version of the specification supported
- * @param cacheSize size of cache to handle duplciate requests. Size <= 0
- * indicates no cache.
*/
protected RpcProgram(String program, String host, int port, int progNumber,
- int lowProgVersion, int highProgVersion, int cacheSize) {
+ int lowProgVersion, int highProgVersion) {
this.program = program;
this.host = host;
this.port = port;
this.progNumber = progNumber;
this.lowProgVersion = lowProgVersion;
this.highProgVersion = highProgVersion;
- this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
- : null;
}
/**
@@ -103,92 +100,50 @@ public abstract class RpcProgram {
}
}
- /**
- * Handle an RPC request.
- * @param rpcCall RPC call that is received
- * @param in xdr with cursor at reading the remaining bytes of a method call
- * @param out xdr output corresponding to Rpc reply
- * @param client making the Rpc request
- * @param channel connection over which Rpc request is received
- * @return response xdr response
- */
- protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel);
-
- public XDR handle(XDR xdr, InetAddress client, Channel channel) {
- XDR out = new XDR();
- RpcCall rpcCall = RpcCall.read(xdr);
- if (LOG.isDebugEnabled()) {
- LOG.debug(program + " procedure #" + rpcCall.getProcedure());
- }
-
- if (!checkProgram(rpcCall.getProgram())) {
- return programMismatch(out, rpcCall);
- }
-
- if (!checkProgramVersion(rpcCall.getVersion())) {
- return programVersionMismatch(out, rpcCall);
- }
-
- // Check for duplicate requests in the cache for non-idempotent requests
- boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
- if (idempotent) {
- CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
- if (entry != null) { // in ache
- if (entry.isCompleted()) {
- LOG.info("Sending the cached reply to retransmitted request "
- + rpcCall.getXid());
- return entry.getResponse();
- } else { // else request is in progress
- LOG.info("Retransmitted request, transaction still in progress "
- + rpcCall.getXid());
- // TODO: ignore the request?
- }
- }
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcInfo info = (RpcInfo) e.getMessage();
+ RpcCall call = (RpcCall) info.header();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(program + " procedure #" + call.getProcedure());
}
- XDR response = handleInternal(rpcCall, xdr, out, client, channel);
- if (response.size() == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No sync response, expect an async response for request XID="
- + rpcCall.getXid());
- }
+ if (this.progNumber != call.getProgram()) {
+ LOG.warn("Invalid RPC call program " + call.getProgram());
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
+ }
+
+ int ver = call.getVersion();
+ if (ver < lowProgVersion || ver > highProgVersion) {
+ LOG.warn("Invalid RPC call version " + ver);
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+ AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+ XDR out = new XDR();
+ reply.write(out);
+ out.writeInt(lowProgVersion);
+ out.writeInt(highProgVersion);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+ .buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
+ return;
}
- // Add the request to the cache
- if (idempotent) {
- rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
- }
- return response;
- }
-
- private XDR programMismatch(XDR out, RpcCall call) {
- LOG.warn("Invalid RPC call program " + call.getProgram());
- RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
- AcceptState.PROG_UNAVAIL, new VerifierNone());
- reply.write(out);
- return out;
- }
-
- private XDR programVersionMismatch(XDR out, RpcCall call) {
- LOG.warn("Invalid RPC call version " + call.getVersion());
- RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
- AcceptState.PROG_MISMATCH, new VerifierNone());
- reply.write(out);
- out.writeInt(lowProgVersion);
- out.writeInt(highProgVersion);
- return out;
- }
-
- private boolean checkProgram(int progNumber) {
- return this.progNumber == progNumber;
- }
-
- /** Return true if a the program version in rpcCall is supported */
- private boolean checkProgramVersion(int programVersion) {
- return programVersion >= lowProgVersion
- && programVersion <= highProgVersion;
+ handleInternal(ctx, info);
}
+
+ protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
@Override
public String toString() {
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Wed Oct 16 21:07:28 2013
@@ -17,17 +17,23 @@
*/
package org.apache.hadoop.oncrpc;
+import java.nio.ByteBuffer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
-public class RpcUtil {
+public final class RpcUtil {
/**
- * The XID in RPC call. It is used for starting with new seed after each reboot.
+ * The XID in RPC call. It is used for starting with new seed after each
+ * reboot.
*/
private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
@@ -35,10 +41,27 @@ public class RpcUtil {
return xid = ++xid + caller.hashCode();
}
+ public static void sendRpcResponse(ChannelHandlerContext ctx,
+ RpcResponse response) {
+ Channels.fireMessageReceived(ctx, response);
+ }
+
public static FrameDecoder constructRpcFrameDecoder() {
return new RpcFrameDecoder();
}
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+ public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+ /**
+ * An RPC client can separate a RPC message into several frames (i.e.,
+ * fragments) when transferring it across the wire. RpcFrameDecoder
+ * reconstructs a full RPC message from these fragments.
+ *
+ * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+ * each RPC client.
+ */
static class RpcFrameDecoder extends FrameDecoder {
public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
private ChannelBuffer currentFrame;
@@ -78,4 +101,68 @@ public class RpcUtil {
}
}
}
+
+ /**
+ * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+ * request into a RpcInfo instance.
+ */
+ static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+ private static final Log LOG = LogFactory
+ .getLog(RpcMessageParserStage.class);
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+ XDR in = new XDR(b, XDR.State.READING);
+
+ RpcInfo info = null;
+ try {
+ RpcCall callHeader = RpcCall.read(in);
+ ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+ .slice());
+ info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+ e.getRemoteAddress());
+ } catch (Exception exc) {
+ LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+ }
+
+ if (info != null) {
+ Channels.fireMessageReceived(ctx, info);
+ }
+ }
+ }
+
+ /**
+ * RpcTcpResponseStage sends an RpcResponse across the wire with the
+ * appropriate fragment header.
+ */
+ private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcResponse r = (RpcResponse) e.getMessage();
+ byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+ ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+ ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+ e.getChannel().write(d);
+ }
+ }
+
+ /**
+ * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+ * require a fragment header.
+ */
+ private static final class RpcUdpResponseStage extends
+ SimpleChannelUpstreamHandler {
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ RpcResponse r = (RpcResponse) e.getMessage();
+ e.getChannel().write(r.data(), r.remoteAddress());
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Wed Oct 16 21:07:28 2013
@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFa
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.ni
public class SimpleTcpServer {
public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
protected final int port;
- protected final ChannelPipelineFactory pipelineFactory;
- protected final RpcProgram rpcProgram;
+ protected final SimpleChannelUpstreamHandler rpcProgram;
/** The maximum number of I/O worker threads */
protected final int workerCount;
@@ -50,18 +50,6 @@ public class SimpleTcpServer {
this.port = port;
this.rpcProgram = program;
this.workerCount = workercount;
- this.pipelineFactory = getPipelineFactory();
- }
-
- public ChannelPipelineFactory getPipelineFactory() {
- return new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
- RpcUtil.constructRpcFrameDecoder(),
- new SimpleTcpServerHandler(rpcProgram));
- }
- };
}
public void run() {
@@ -78,7 +66,15 @@ public class SimpleTcpServer {
}
ServerBootstrap bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+ RpcUtil.STAGE_RPC_TCP_RESPONSE);
+ }
+ });
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Wed Oct 16 21:07:28 2013
@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
private final int RECEIVE_BUFFER_SIZE = 65536;
protected final int port;
- protected final ChannelPipelineFactory pipelineFactory;
- protected final RpcProgram rpcProgram;
+ protected final SimpleChannelUpstreamHandler rpcProgram;
protected final int workerCount;
- public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+ public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
this.port = port;
this.rpcProgram = program;
this.workerCount = workerCount;
- this.pipelineFactory = new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
- }
- };
}
public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
Executors.newCachedThreadPool(), workerCount);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
- ChannelPipeline p = b.getPipeline();
- p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+ b.setPipeline(Channels.pipeline(
+ RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+ RpcUtil.STAGE_RPC_UDP_RESPONSE));
b.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java Wed Oct 16 21:07:28 2013
@@ -93,6 +93,10 @@ public final class XDR {
return n;
}
+ public ByteBuffer buffer() {
+ return buf.duplicate();
+ }
+
public int size() {
// TODO: This overloading intends to be compatible with the semantics of
// the previous version of the class. This function should be separated into
@@ -219,7 +223,7 @@ public final class XDR {
return xdr.buf.remaining() >= len;
}
- private static byte[] recordMark(int size, boolean last) {
+ static byte[] recordMark(int size, boolean last) {
byte[] b = new byte[SIZEOF_INT];
ByteBuffer buf = ByteBuffer.wrap(b);
buf.putInt(!last ? size : size | 0x80000000);
@@ -259,9 +263,8 @@ public final class XDR {
@VisibleForTesting
public byte[] getBytes() {
- ByteBuffer d = buf.duplicate();
- byte[] b = new byte[d.position()];
- d.flip();
+ ByteBuffer d = asReadOnlyWrap().buffer();
+ byte[] b = new byte[d.remaining()];
d.get(b);
return b;
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/security/Verifier.java Wed Oct 16 21:07:28 2013
@@ -18,16 +18,17 @@
package org.apache.hadoop.oncrpc.security;
import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.RpcAuthInfo.AuthFlavor;
/**
* Base class for verifier. Currently our authentication only supports 3 types
- * of auth flavors: {@link AuthFlavor#AUTH_NONE}, {@link AuthFlavor#AUTH_SYS},
- * and {@link AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
+ * of auth flavors: {@link RpcAuthInfo.AuthFlavor#AUTH_NONE}, {@link RpcAuthInfo.AuthFlavor#AUTH_SYS},
+ * and {@link RpcAuthInfo.AuthFlavor#RPCSEC_GSS}. Thus for verifier we only need to handle
* AUTH_NONE and RPCSEC_GSS
*/
public abstract class Verifier extends RpcAuthInfo {
+ public static final Verifier VERIFIER_NONE = new VerifierNone();
+
protected Verifier(AuthFlavor flavor) {
super(flavor);
}
@@ -61,6 +62,4 @@ public abstract class Verifier extends R
}
verifier.write(xdr);
}
-
-
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Wed Oct 16 21:07:28 2013
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.portmap;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;
@@ -26,10 +25,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* An rpcbind request handler.
@@ -44,7 +48,7 @@ public class RpcProgramPortmap extends R
private final HashMap<String, PortmapMapping> map;
public RpcProgramPortmap() {
- super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+ super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
map = new HashMap<String, PortmapMapping>(256);
}
@@ -130,10 +134,15 @@ public class RpcProgramPortmap extends R
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR in = new XDR(data);
+ XDR out = new XDR();
+
if (portmapProc == Procedure.PMAPPROC_NULL) {
out = nullOp(xid, in, out);
} else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -148,11 +157,14 @@ public class RpcProgramPortmap extends R
out = getport(xid, in, out);
} else {
LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
- RpcAcceptedReply.getInstance(xid,
- RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
- out);
+ RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+ reply.write(out);
}
- return out;
+
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Wed Oct 16 21:07:28 2013
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
@@ -30,6 +29,7 @@ import org.apache.hadoop.oncrpc.security
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.junit.Test;
@@ -38,7 +38,7 @@ import org.mockito.Mockito;
public class TestFrameDecoder {
private static int port = 12345; // some random server port
- private static XDR result = null;
+ private static int resultSize;
static void testRequest(XDR request) {
SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -49,18 +49,20 @@ public class TestFrameDecoder {
static class TestRpcProgram extends RpcProgram {
protected TestRpcProgram(String program, String host, int port,
- int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
- super(program, host, port, progNumber, lowProgVersion, highProgVersion,
- cacheSize);
+ int progNumber, int lowProgVersion, int highProgVersion) {
+ super(program, host, port, progNumber, lowProgVersion, highProgVersion);
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
- InetAddress client, Channel channel) {
- // Get the final complete request and return a void response.
- result = in;
- RpcAcceptedReply.getAcceptInstance(1234, new VerifierNone()).write(out);
- return out;
+ protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ resultSize = info.data().readableBytes();
+ RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+ new VerifierNone());
+ XDR out = new XDR();
+ reply.write(out);
+ ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
@@ -147,21 +149,22 @@ public class TestFrameDecoder {
public void testFrames() {
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
- "localhost", port, 100000, 1, 2, 100);
+ "localhost", port, 100000, 1, 2);
SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
tcpServer.run();
XDR xdrOut = createGetportMount();
+ int headerSize = xdrOut.size();
int bufsize = 2 * 1024 * 1024;
byte[] buffer = new byte[bufsize];
xdrOut.writeFixedOpaque(buffer);
- int requestSize = xdrOut.size();
+ int requestSize = xdrOut.size() - headerSize;
// Send the request to the server
testRequest(xdrOut);
// Verify the server got the request with right size
- assertTrue(requestSize == result.size());
+ assertEquals(requestSize, resultSize);
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
@@ -173,10 +176,6 @@ public class TestFrameDecoder {
static XDR createGetportMount() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
- xdr_out.writeInt(0); // AUTH_NULL
- xdr_out.writeInt(0); // cred len
- xdr_out.writeInt(0); // verifier AUTH_NULL
- xdr_out.writeInt(0); // verf len
return xdr_out;
}
/*
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java Wed Oct 16 21:07:28 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallC
import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
import org.junit.Test;
+import static org.mockito.Mockito.*;
+
/**
* Unit tests for {@link RpcCallCache}
*/
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
validateInprogressCacheEntry(e);
// Set call as completed
- XDR response = new XDR();
+ RpcResponse response = mock(RpcResponse.class);
cache.callCompleted(clientIp, xid, response);
e = cache.checkOrAddToCache(clientIp, xid);
validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
assertNull(c.getResponse());
}
- private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+ private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
assertFalse(c.isInProgress());
assertTrue(c.isCompleted());
assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
assertFalse(c.isCompleted());
assertNull(c.getResponse());
- XDR response = new XDR();
+ RpcResponse response = mock(RpcResponse.class);
c.setResponse(response);
validateCompletedCacheEntry(c, response);
}