You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/01/25 00:22:01 UTC

svn commit: r1235548 [7/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map...

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import junit.framework.TestCase;
+
+public class TestMRSequenceFileAsBinaryInputFormat extends TestCase {
+  private static final int RECORDS = 10000;
+
+  public void testBinary() throws IOException, InterruptedException {
+    Job job = new Job();
+    FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "testbinary.seq");
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+
+    fs.delete(dir, true);
+    FileInputFormat.setInputPaths(job, dir);
+
+    Text tkey = new Text();
+    Text tval = new Text();
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+      job.getConfiguration(), file, Text.class, Text.class);
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        tkey.set(Integer.toString(r.nextInt(), 36));
+        tval.set(Long.toString(r.nextLong(), 36));
+        writer.append(tkey, tval);
+      }
+    } finally {
+      writer.close();
+    }
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(job.getConfiguration());
+    InputFormat<BytesWritable,BytesWritable> bformat =
+      new SequenceFileAsBinaryInputFormat();
+
+    int count = 0;
+    r.setSeed(seed);
+    BytesWritable bkey = new BytesWritable();
+    BytesWritable bval = new BytesWritable();
+    Text cmpkey = new Text();
+    Text cmpval = new Text();
+    DataInputBuffer buf = new DataInputBuffer();
+    FileInputFormat.setInputPaths(job, file);
+    for (InputSplit split : bformat.getSplits(job)) {
+      RecordReader<BytesWritable, BytesWritable> reader =
+            bformat.createRecordReader(split, context);
+      MapContext<BytesWritable, BytesWritable, BytesWritable, BytesWritable> 
+        mcontext = new MapContext<BytesWritable, BytesWritable,
+          BytesWritable, BytesWritable>(job.getConfiguration(), 
+          context.getTaskAttemptID(), reader, null, null, 
+          MapReduceTestUtil.createDummyReporter(), 
+          split);
+      reader.initialize(split, mcontext);
+      try {
+        while (reader.nextKeyValue()) {
+          bkey = reader.getCurrentKey();
+          bval = reader.getCurrentValue();
+          tkey.set(Integer.toString(r.nextInt(), 36));
+          tval.set(Long.toString(r.nextLong(), 36));
+          buf.reset(bkey.getBytes(), bkey.getLength());
+          cmpkey.readFields(buf);
+          buf.reset(bval.getBytes(), bval.getLength());
+          cmpval.readFields(buf);
+          assertTrue(
+            "Keys don't match: " + "*" + cmpkey.toString() + ":" +
+            tkey.toString() + "*",
+            cmpkey.toString().equals(tkey.toString()));
+          assertTrue(
+            "Vals don't match: " + "*" + cmpval.toString() + ":" +
+            tval.toString() + "*",
+            cmpval.toString().equals(tval.toString()));
+          ++count;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    assertEquals("Some records not found", RECORDS, count);
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileAsTextInputFormat extends TestCase {
+  private static int MAX_LENGTH = 10000;
+  private static Configuration conf = new Configuration();
+
+  public void testFormat() throws Exception {
+    Job job = new Job(conf);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "test.seq");
+    
+    int seed = new Random().nextInt();
+    Random random = new Random(seed);
+
+    fs.delete(dir, true);
+
+    FileInputFormat.setInputPaths(job, dir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+
+      // create a file with length entries
+      SequenceFile.Writer writer =
+        SequenceFile.createWriter(fs, conf, file,
+          IntWritable.class, LongWritable.class);
+      try {
+        for (int i = 0; i < length; i++) {
+          IntWritable key = new IntWritable(i);
+          LongWritable value = new LongWritable(10 * i);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+
+      TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+      // try splitting the file in a variety of sizes
+      InputFormat<Text, Text> format =
+        new SequenceFileAsTextInputFormat();
+      
+      for (int i = 0; i < 3; i++) {
+        // check each split
+        BitSet bits = new BitSet(length);
+        int numSplits =
+          random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+        FileInputFormat.setMaxInputSplitSize(job, 
+          fs.getFileStatus(file).getLen() / numSplits);
+        for (InputSplit split : format.getSplits(job)) {
+          RecordReader<Text, Text> reader =
+            format.createRecordReader(split, context);
+          MapContext<Text, Text, Text, Text> mcontext = 
+            new MapContext<Text, Text, Text, Text>(job.getConfiguration(), 
+            context.getTaskAttemptID(), reader, null, null, 
+            MapReduceTestUtil.createDummyReporter(), 
+            split);
+          reader.initialize(split, mcontext);
+          Class<?> readerClass = reader.getClass();
+          assertEquals("reader class is SequenceFileAsTextRecordReader.",
+            SequenceFileAsTextRecordReader.class, readerClass);        
+          Text key;
+          try {
+            int count = 0;
+            while (reader.nextKeyValue()) {
+              key = reader.getCurrentKey();
+              int keyInt = Integer.parseInt(key.toString());
+              assertFalse("Key in multiple partitions.", bits.get(keyInt));
+              bits.set(keyInt);
+              count++;
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestMRSequenceFileAsTextInputFormat().testFormat();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileInputFilter extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog(TestMRSequenceFileInputFilter.class.getName());
+
+  private static final int MAX_LENGTH = 15000;
+  private static final Configuration conf = new Configuration();
+  private static final Job job;
+  private static final FileSystem fs;
+  private static final Path inDir = 
+    new Path(System.getProperty("test.build.data",".") + "/mapred");
+  private static final Path inFile = new Path(inDir, "test.seq");
+  private static final Random random = new Random(1);
+  
+  static {
+    try {
+      job = new Job(conf);
+      FileInputFormat.setInputPaths(job, inDir);
+      fs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void createSequenceFile(int numRecords) throws Exception {
+    // create a file with length entries
+    SequenceFile.Writer writer =
+      SequenceFile.createWriter(fs, conf, inFile,
+                                Text.class, BytesWritable.class);
+    try {
+      for (int i = 1; i <= numRecords; i++) {
+        Text key = new Text(Integer.toString(i));
+        byte[] data = new byte[random.nextInt(10)];
+        random.nextBytes(data);
+        BytesWritable value = new BytesWritable(data);
+        writer.append(key, value);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+
+  private int countRecords(int numSplits) 
+      throws IOException, InterruptedException {
+    InputFormat<Text, BytesWritable> format =
+      new SequenceFileInputFilter<Text, BytesWritable>();
+    if (numSplits == 0) {
+      numSplits =
+        random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+    }
+    FileInputFormat.setMaxInputSplitSize(job, 
+      fs.getFileStatus(inFile).getLen() / numSplits);
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(job.getConfiguration());
+    // check each split
+    int count = 0;
+    for (InputSplit split : format.getSplits(job)) {
+      RecordReader<Text, BytesWritable> reader =
+        format.createRecordReader(split, context);
+      MapContext<Text, BytesWritable, Text, BytesWritable> mcontext = 
+        new MapContext<Text, BytesWritable, Text, BytesWritable>(
+        job.getConfiguration(), 
+        context.getTaskAttemptID(), reader, null, null, 
+        MapReduceTestUtil.createDummyReporter(), split);
+      reader.initialize(split, mcontext);
+      try {
+        while (reader.nextKeyValue()) {
+          LOG.info("Accept record " + reader.getCurrentKey().toString());
+          count++;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    return count;
+  }
+  
+  public void testRegexFilter() throws Exception {
+    // set the filter class
+    LOG.info("Testing Regex Filter with patter: \\A10*");
+    SequenceFileInputFilter.setFilterClass(job, 
+      SequenceFileInputFilter.RegexFilter.class);
+    SequenceFileInputFilter.RegexFilter.setPattern(
+      job.getConfiguration(), "\\A10*");
+    
+    // clean input dir
+    fs.delete(inDir, true);
+  
+    // for a variety of lengths
+    for (int length = 1; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      LOG.info("******Number of records: " + length);
+      createSequenceFile(length);
+      int count = countRecords(0);
+      assertEquals(count, length==0 ? 0 : (int)Math.log10(length) + 1);
+    }
+    
+    // clean up
+    fs.delete(inDir, true);
+  }
+
+  public void testPercentFilter() throws Exception {
+    LOG.info("Testing Percent Filter with frequency: 1000");
+    // set the filter class
+    SequenceFileInputFilter.setFilterClass(job, 
+      SequenceFileInputFilter.PercentFilter.class);
+    SequenceFileInputFilter.PercentFilter.setFrequency(
+      job.getConfiguration(), 1000);
+      
+    // clean input dir
+    fs.delete(inDir, true);
+    
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      LOG.info("******Number of records: "+length);
+      createSequenceFile(length);
+      int count = countRecords(1);
+      LOG.info("Accepted " + count + " records");
+      int expectedCount = length / 1000;
+      if (expectedCount * 1000 != length)
+        expectedCount++;
+      assertEquals(count, expectedCount);
+    }
+      
+    // clean up
+    fs.delete(inDir, true);
+  }
+  
+  public void testMD5Filter() throws Exception {
+    // set the filter class
+    LOG.info("Testing MD5 Filter with frequency: 1000");
+    SequenceFileInputFilter.setFilterClass(job, 
+      SequenceFileInputFilter.MD5Filter.class);
+    SequenceFileInputFilter.MD5Filter.setFrequency(
+      job.getConfiguration(), 1000);
+      
+    // clean input dir
+    fs.delete(inDir, true);
+    
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      LOG.info("******Number of records: " + length);
+      createSequenceFile(length);
+      LOG.info("Accepted " + countRecords(0) + " records");
+    }
+    // clean up
+    fs.delete(inDir, true);
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestMRSequenceFileInputFilter filter = new TestMRSequenceFileInputFilter();
+    filter.testRegexFilter();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * @see TestDelegatingInputFormat
+ */
+public class TestMultipleInputs extends TestCase {
+  
+  public void testAddInputPathWithFormat() throws IOException {
+    final Job job = new Job();
+    MultipleInputs.addInputPath(job, new Path("/foo"), TextInputFormat.class);
+    MultipleInputs.addInputPath(job, new Path("/bar"),
+        KeyValueTextInputFormat.class);
+    final Map<Path, InputFormat> inputs = MultipleInputs
+       .getInputFormatMap(new JobContext(job.getConfiguration(), new JobID()));
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+  }
+
+  public void testAddInputPathWithMapper() throws IOException {
+    final Job job = new Job();
+    MultipleInputs.addInputPath(job, new Path("/foo"), TextInputFormat.class,
+       MapClass.class);
+    MultipleInputs.addInputPath(job, new Path("/bar"),
+       KeyValueTextInputFormat.class, MapClass2.class);
+    final Map<Path, InputFormat> inputs = MultipleInputs
+       .getInputFormatMap(job);
+    final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
+       .getMapperTypeMap(job);
+
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+    assertEquals(MapClass.class, maps.get(new Path("/foo")));
+    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+  }
+
+  static class MapClass extends Mapper<String, String, String, String> {
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+
+public class TestNLineInputFormat extends TestCase {
+  private static int MAX_LENGTH = 200;
+  
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs = null; 
+
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestNLineInputFormat");
+  
+  public void testFormat() throws Exception {
+    Job job = new Job(conf);
+    Path file = new Path(workDir, "test.txt");
+
+    int seed = new Random().nextInt();
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+    int numLinesPerMap = 5;
+    NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+      checkFormat(job, numLinesPerMap);
+    }
+  }
+
+  void checkFormat(Job job, int expectedN) 
+      throws IOException, InterruptedException {
+    NLineInputFormat format = new NLineInputFormat();
+    List<InputSplit> splits = format.getSplits(job);
+    // check all splits except last one
+    int count = 0;
+    for (int i = 0; i < splits.size() -1; i++) {
+      assertEquals("There are no split locations", 0,
+                   splits.get(i).getLocations().length);
+      TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+      RecordReader<LongWritable, Text> reader = format.createRecordReader(
+        splits.get(i), context);
+      Class<?> clazz = reader.getClass();
+      assertEquals("reader class is LineRecordReader.", 
+        LineRecordReader.class, clazz);
+      MapContext<LongWritable, Text, LongWritable, Text> mcontext = 
+        new MapContext<LongWritable, Text, LongWritable, Text>(
+          job.getConfiguration(), context.getTaskAttemptID(), reader, null,
+          null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
+      reader.initialize(splits.get(i), mcontext);
+         
+      try {
+        count = 0;
+        while (reader.nextKeyValue()) {
+          count++;
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals("number of lines in split is " + expectedN ,
+                   expectedN, count);
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestNLineInputFormat().testFormat();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import static org.junit.Assert.assertFalse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ */
+public class TestControlledJob {
+  
+  @Test
+  public void testAddingDependingJobToRunningJobFails() throws Exception {
+    Configuration conf = new Configuration();
+    ControlledJob job1 = new ControlledJob(conf);
+    job1.setJobState(ControlledJob.State.RUNNING);
+    assertFalse(job1.addDependingJob(new ControlledJob(conf)));
+  }
+
+  @Test
+  public void testAddingDependingJobToCompletedJobFails() throws Exception {
+    Configuration conf = new Configuration();
+    ControlledJob job1 = new ControlledJob(conf);
+    job1.setJobState(ControlledJob.State.SUCCESS);
+    assertFalse(job1.addDependingJob(new ControlledJob(conf)));
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestControlledJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+/**
+ * Tests the JobControl API using mock and stub Job instances.
+ */
+public class TestMapReduceJobControlWithMocks {
+
+  @Test
+  public void testSuccessfulJobs() throws Exception {
+    JobControl jobControl = new JobControl("Test");
+    
+    ControlledJob job1 = createSuccessfulControlledJob(jobControl);
+    ControlledJob job2 = createSuccessfulControlledJob(jobControl);
+    ControlledJob job3 = createSuccessfulControlledJob(jobControl, job1, job2);
+    ControlledJob job4 = createSuccessfulControlledJob(jobControl, job3);
+    
+    runJobControl(jobControl);
+    
+    assertEquals("Success list", 4, jobControl.getSuccessfulJobList().size());
+    assertEquals("Failed list", 0, jobControl.getFailedJobList().size());
+    
+    assertTrue(job1.getJobState() == ControlledJob.State.SUCCESS);
+    assertTrue(job2.getJobState() == ControlledJob.State.SUCCESS);
+    assertTrue(job3.getJobState() == ControlledJob.State.SUCCESS);
+    assertTrue(job4.getJobState() == ControlledJob.State.SUCCESS);
+    
+    jobControl.stop();
+  }
+  
+  @Test
+  public void testFailedJob() throws Exception {
+    JobControl jobControl = new JobControl("Test");
+    
+    ControlledJob job1 = createFailedControlledJob(jobControl);
+    ControlledJob job2 = createSuccessfulControlledJob(jobControl);
+    ControlledJob job3 = createSuccessfulControlledJob(jobControl, job1, job2);
+    ControlledJob job4 = createSuccessfulControlledJob(jobControl, job3);
+    
+    runJobControl(jobControl);
+    
+    assertEquals("Success list", 1, jobControl.getSuccessfulJobList().size());
+    assertEquals("Failed list", 3, jobControl.getFailedJobList().size());
+
+    assertTrue(job1.getJobState() == ControlledJob.State.FAILED);
+    assertTrue(job2.getJobState() == ControlledJob.State.SUCCESS);
+    assertTrue(job3.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+    assertTrue(job4.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+    
+    jobControl.stop();
+  }
+  
+  @Test
+  public void testKillJob() throws Exception {
+    JobControl jobControl = new JobControl("Test");
+    
+    ControlledJob job = createFailedControlledJob(jobControl);
+    
+    job.killJob();
+
+    // Verify that killJob() was called on the mock Job
+    verify(job.getJob()).killJob();
+  }
+  
+  private Job createJob(boolean complete, boolean successful)
+  	throws IOException, InterruptedException {
+    // Create a stub Job that responds in a controlled way
+    Job mockJob = mock(Job.class);
+    when(mockJob.getConfiguration()).thenReturn(new Configuration());
+    when(mockJob.isComplete()).thenReturn(complete);
+    when(mockJob.isSuccessful()).thenReturn(successful);
+    return mockJob;
+  }
+  
+  private ControlledJob createControlledJob(JobControl jobControl,
+      	boolean successful, ControlledJob... dependingJobs)
+      	throws IOException, InterruptedException {
+    List<ControlledJob> dependingJobsList = dependingJobs == null ? null :
+      Arrays.asList(dependingJobs);
+    ControlledJob job = new ControlledJob(createJob(true, successful),
+	dependingJobsList);
+    jobControl.addJob(job);
+    return job;
+  }
+  
+  private ControlledJob createSuccessfulControlledJob(JobControl jobControl,
+      ControlledJob... dependingJobs) throws IOException, InterruptedException {
+    return createControlledJob(jobControl, true, dependingJobs);
+  }
+
+  private ControlledJob createFailedControlledJob(JobControl jobControl,
+      ControlledJob... dependingJobs) throws IOException, InterruptedException {
+    return createControlledJob(jobControl, false, dependingJobs);
+  }
+
+  private void runJobControl(JobControl jobControl) {
+    Thread controller = new Thread(jobControl);
+    controller.start();
+    waitTillAllFinished(jobControl);
+  }
+
+  private void waitTillAllFinished(JobControl jobControl) {
+    while (!jobControl.allFinished()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+	// ignore
+      }
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestMRSequenceFileAsBinaryOutputFormat extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestMRSequenceFileAsBinaryOutputFormat.class.getName());
+
+  private static final int RECORDS = 10000;
+  
+  public void testBinary() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    Job job = new Job(conf);
+    
+    Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
+                    "outseq");
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+
+    FileOutputFormat.setOutputPath(job, outdir);
+
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
+                                          IntWritable.class );
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
+                                          DoubleWritable.class ); 
+
+    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+                                                       CompressionType.BLOCK);
+
+    BytesWritable bkey = new BytesWritable();
+    BytesWritable bval = new BytesWritable();
+
+    TaskAttemptContext context = 
+      MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
+    OutputFormat<BytesWritable, BytesWritable> outputFormat = 
+      new SequenceFileAsBinaryOutputFormat();
+    OutputCommitter committer = outputFormat.getOutputCommitter(context);
+    committer.setupJob(job);
+    RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
+      getRecordWriter(context);
+
+    IntWritable iwritable = new IntWritable();
+    DoubleWritable dwritable = new DoubleWritable();
+    DataOutputBuffer outbuf = new DataOutputBuffer();
+    LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        iwritable = new IntWritable(r.nextInt());
+        iwritable.write(outbuf);
+        bkey.set(outbuf.getData(), 0, outbuf.getLength());
+        outbuf.reset();
+        dwritable = new DoubleWritable(r.nextDouble());
+        dwritable.write(outbuf);
+        bval.set(outbuf.getData(), 0, outbuf.getLength());
+        outbuf.reset();
+        writer.write(bkey, bval);
+      }
+    } finally {
+      writer.close(context);
+    }
+    committer.commitTask(context);
+    committer.commitJob(job);
+
+    InputFormat<IntWritable, DoubleWritable> iformat =
+      new SequenceFileInputFormat<IntWritable, DoubleWritable>();
+    int count = 0;
+    r.setSeed(seed);
+    SequenceFileInputFormat.setInputPaths(job, outdir);
+    LOG.info("Reading data by SequenceFileInputFormat");
+    for (InputSplit split : iformat.getSplits(job)) {
+      RecordReader<IntWritable, DoubleWritable> reader =
+        iformat.createRecordReader(split, context);
+      MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable> 
+        mcontext = new MapContext<IntWritable, DoubleWritable,
+          BytesWritable, BytesWritable>(job.getConfiguration(), 
+          context.getTaskAttemptID(), reader, null, null, 
+          MapReduceTestUtil.createDummyReporter(), 
+          split);
+      reader.initialize(split, mcontext);
+      try {
+        int sourceInt;
+        double sourceDouble;
+        while (reader.nextKeyValue()) {
+          sourceInt = r.nextInt();
+          sourceDouble = r.nextDouble();
+          iwritable = reader.getCurrentKey();
+          dwritable = reader.getCurrentValue();
+          assertEquals(
+              "Keys don't match: " + "*" + iwritable.get() + ":" + 
+                                           sourceInt + "*",
+              sourceInt, iwritable.get());
+          assertTrue(
+              "Vals don't match: " + "*" + dwritable.get() + ":" +
+                                           sourceDouble + "*",
+              Double.compare(dwritable.get(), sourceDouble) == 0 );
+          ++count;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    assertEquals("Some records not found", RECORDS, count);
+  }
+
+  public void testSequenceOutputClassDefaultsToMapRedOutputClass() 
+         throws IOException {
+    Job job = new Job();
+    // Setting Random class to test getSequenceFileOutput{Key,Value}Class
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(BooleanWritable.class);
+
+    assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass", 
+      FloatWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+    assertEquals("SequenceFileOutputValueClass should default to " 
+      + "ouputValueClass", 
+      BooleanWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
+      IntWritable.class );
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
+      DoubleWritable.class ); 
+
+    assertEquals("SequenceFileOutputKeyClass not updated", 
+      IntWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+    assertEquals("SequenceFileOutputValueClass not updated", 
+      DoubleWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+  }
+
+  public void testcheckOutputSpecsForbidRecordCompression() 
+      throws IOException {
+    Job job = new Job();
+    FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+    Path outputdir = new Path(System.getProperty("test.build.data", "/tmp") 
+                              + "/output");
+    fs.delete(outputdir, true);
+
+    // Without outputpath, FileOutputFormat.checkoutputspecs will throw 
+    // InvalidJobConfException
+    FileOutputFormat.setOutputPath(job, outputdir);
+
+    // SequenceFileAsBinaryOutputFormat doesn't support record compression
+    // It should throw an exception when checked by checkOutputSpecs
+    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+      CompressionType.BLOCK);
+    try {
+      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+    } catch (Exception e) {
+      fail("Block compression should be allowed for " 
+        + "SequenceFileAsBinaryOutputFormat:Caught " + e.getClass().getName());
+    }
+
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+      CompressionType.RECORD);
+    try {
+      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+      fail("Record compression should not be allowed for " 
+        + "SequenceFileAsBinaryOutputFormat");
+    } catch (InvalidJobConfException ie) {
+      // expected
+    } catch (Exception e) {
+      fail("Expected " + InvalidJobConfException.class.getName() 
+        + "but caught " + e.getClass().getName() );
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+public class TestMultipleOutputs extends HadoopTestCase {
+
+  public TestMultipleOutputs() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  public void testWithoutCounters() throws Exception {
+    _testMultipleOutputs(false);
+  }
+
+  public void testWithCounters() throws Exception {
+    _testMultipleOutputs(true);
+  }
+
+  private static final Path ROOT_DIR = new Path("testing/mo");
+  private static final Path IN_DIR = new Path(ROOT_DIR, "input");
+  private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
+
+  private Path getDir(Path dir) {
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+        .replace(' ', '+');
+      dir = new Path(localPathRoot, dir);
+    }
+    return dir;
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    Path rootDir = getDir(ROOT_DIR);
+    Path inDir = getDir(IN_DIR);
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(rootDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+  }
+
+  public void tearDown() throws Exception {
+    Path rootDir = getDir(ROOT_DIR);
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(rootDir, true);
+    super.tearDown();
+  }
+
+  protected void _testMultipleOutputs(boolean withCounters) throws Exception {
+    Path inDir = getDir(IN_DIR);
+    Path outDir = getDir(OUT_DIR);
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    file = fs.create(new Path(inDir, "part-1"));
+    file.writeBytes("a\nb\n\nc\nd\ne");
+    file.close();
+
+    Job job = new Job(conf);
+    job.setJobName("mo");
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+
+    MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
+      LongWritable.class, Text.class);
+
+    MultipleOutputs.setCountersEnabled(job, withCounters);
+
+    job.setMapperClass(MOMap.class);
+    job.setReducerClass(MOReduce.class);
+
+    FileInputFormat.setInputPaths(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    job.waitForCompletion(false);
+
+    // assert number of named output part files
+    int namedOutputCount = 0;
+    FileStatus[] statuses = fs.listStatus(outDir);
+    
+    for (FileStatus status : statuses) {
+      if (status.getPath().getName().equals("text-m-00000") ||
+        status.getPath().getName().equals("text-m-00001") ||
+        status.getPath().getName().equals("text-r-00000")) {
+        namedOutputCount++;
+      }
+    }
+    assertEquals(3, namedOutputCount);
+
+    // assert TextOutputFormat files correctness
+    JobContext jobContext = new JobContext(job.getConfiguration(), new JobID());
+    BufferedReader reader = new BufferedReader(
+      new InputStreamReader(fs.open(
+        new Path(FileOutputFormat.getOutputPath(jobContext), "text-r-00000"))));
+    int count = 0;
+    String line = reader.readLine();
+    while (line != null) {
+      assertTrue(line.endsWith("text"));
+      line = reader.readLine();
+      count++;
+    }
+    reader.close();
+    assertFalse(count == 0);
+
+    CounterGroup counters =
+      job.getCounters().getGroup(MultipleOutputs.class.getName());
+    if (!withCounters) {
+      assertEquals(0, counters.size());
+    }
+    else {
+      assertEquals(1, counters.size());
+      assertEquals(4, counters.findCounter("text").getValue());
+    }
+
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOMap extends Mapper<LongWritable, Text, LongWritable,
+    Text> {
+
+    private MultipleOutputs mos;
+
+    @Override
+    protected void setup(Context context) {
+      mos = new MultipleOutputs(context);
+    }
+    
+    @Override
+    public void map(LongWritable key, Text value, Context context)
+      throws IOException, InterruptedException {
+      if (!value.toString().equals("a")) {
+        context.write(key, value);
+      } else {
+        mos.write("text", key, new Text("text"));
+      }
+    }
+    
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+      mos.close();
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public static class MOReduce extends Reducer<LongWritable, Text,
+    LongWritable, Text> {
+
+    private MultipleOutputs mos;
+
+    @Override
+    protected void setup(Context context) {
+      mos = new MultipleOutputs(context);
+    }
+
+    @Override
+    public void reduce(LongWritable key, Iterable<Text> values, Context context)
+      throws IOException, InterruptedException {
+      for (Text value : values) {
+        if (!value.toString().equals("b")) {
+          context.write(key, value);
+        } else {
+          mos.write("text", key, new Text("text"));
+        }
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+      mos.close();
+    }
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/output/TestMultipleOutputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import junit.framework.TestCase;
+
+public class TestBinaryPartitioner extends TestCase {
+
+  public void testDefaultOffsets() {
+    Configuration conf = new Configuration();
+    BinaryPartitioner<?> partitioner = 
+      ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+    
+    BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); 
+    BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 });
+    int partition1 = partitioner.getPartition(key1, null, 10);
+    int partition2 = partitioner.getPartition(key2, null, 10);
+    assertEquals(partition1, partition2);
+    
+    key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); 
+    key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 });
+    partition1 = partitioner.getPartition(key1, null, 10);
+    partition2 = partitioner.getPartition(key2, null, 10);
+    assertTrue(partition1 != partition2);
+    
+    key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); 
+    key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 });
+    partition1 = partitioner.getPartition(key1, null, 10);
+    partition2 = partitioner.getPartition(key2, null, 10);
+    assertTrue(partition1 != partition2);
+  }
+  
+  public void testCustomOffsets() {
+    Configuration conf = new Configuration();
+    BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); 
+    BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 7, 8 });
+    
+    BinaryPartitioner.setOffsets(conf, 1, -3);
+    BinaryPartitioner<?> partitioner = 
+      ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+    int partition1 = partitioner.getPartition(key1, null, 10);
+    int partition2 = partitioner.getPartition(key2, null, 10);
+    assertEquals(partition1, partition2);
+    
+    BinaryPartitioner.setOffsets(conf, 1, 2);
+    partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+    partition1 = partitioner.getPartition(key1, null, 10);
+    partition2 = partitioner.getPartition(key2, null, 10);
+    assertEquals(partition1, partition2);
+    
+    BinaryPartitioner.setOffsets(conf, -4, -3);
+    partitioner = ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+    partition1 = partitioner.getPartition(key1, null, 10);
+    partition2 = partitioner.getPartition(key2, null, 10);
+    assertEquals(partition1, partition2);
+  }
+  
+  public void testLowerBound() {
+    Configuration conf = new Configuration();
+    BinaryPartitioner.setLeftOffset(conf, 0);
+    BinaryPartitioner<?> partitioner = 
+      ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+    BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); 
+    BinaryComparable key2 = new BytesWritable(new byte[] { 6, 2, 3, 4, 5 });
+    int partition1 = partitioner.getPartition(key1, null, 10);
+    int partition2 = partitioner.getPartition(key2, null, 10);
+    assertTrue(partition1 != partition2);
+  }
+  
+  public void testUpperBound() {
+    Configuration conf = new Configuration();
+    BinaryPartitioner.setRightOffset(conf, 4);
+    BinaryPartitioner<?> partitioner = 
+      ReflectionUtils.newInstance(BinaryPartitioner.class, conf);
+    BinaryComparable key1 = new BytesWritable(new byte[] { 1, 2, 3, 4, 5 }); 
+    BinaryComparable key2 = new BytesWritable(new byte[] { 1, 2, 3, 4, 6 });
+    int partition1 = partitioner.getPartition(key1, null, 10);
+    int partition2 = partitioner.getPartition(key2, null, 10);
+    assertTrue(partition1 != partition2);
+  }
+  
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestBinaryPartitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestInputSampler {
+
+  static class SequentialSplit extends InputSplit {
+    private int i;
+    SequentialSplit(int i) {
+      this.i = i;
+    }
+    public long getLength() { return 0; }
+    public String[] getLocations() { return new String[0]; }
+    public int getInit() { return i; }
+  }
+
+  static class TestInputSamplerIF
+      extends InputFormat<IntWritable,NullWritable> {
+
+    final int maxDepth;
+    final ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+    TestInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+      this.maxDepth = maxDepth;
+      assert splitInit.length == numSplits;
+      for (int i = 0; i < numSplits; ++i) {
+        splits.add(new SequentialSplit(splitInit[i]));
+      }
+    }
+
+    public List<InputSplit> getSplits(JobContext context)
+        throws IOException, InterruptedException {
+      return splits;
+    }
+
+    public RecordReader<IntWritable,NullWritable> createRecordReader(
+        final InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      return new RecordReader<IntWritable,NullWritable>() {
+        private int maxVal;
+        private final IntWritable i = new IntWritable();
+        public void initialize(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+          i.set(((SequentialSplit)split).getInit() - 1);
+          maxVal = i.get() + maxDepth + 1;
+        }
+        public boolean nextKeyValue() {
+          i.set(i.get() + 1);
+          return i.get() < maxVal;
+        }
+        public IntWritable getCurrentKey() { return i; }
+        public NullWritable getCurrentValue() { return NullWritable.get(); }
+        public float getProgress() { return 1.0f; }
+        public void close() { }
+      };
+    }
+
+  }
+
+  /**
+   * Verify SplitSampler contract, that an equal number of records are taken
+   * from the first splits.
+   */
+  @Test
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testSplitSampler() throws Exception {
+    final int TOT_SPLITS = 15;
+    final int NUM_SPLITS = 5;
+    final int STEP_SAMPLE = 5;
+    final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+    InputSampler.Sampler<IntWritable,NullWritable> sampler =
+      new InputSampler.SplitSampler<IntWritable,NullWritable>(
+          NUM_SAMPLES, NUM_SPLITS);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i * STEP_SAMPLE;
+    }
+    Job ignored = new Job();
+    Object[] samples = sampler.getSample(
+        new TestInputSamplerIF(100000, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i, ((IntWritable)samples[i]).get());
+    }
+  }
+
+  /**
+   * Verify IntervalSampler contract, that samples are taken at regular
+   * intervals from the given splits.
+   */
+  @Test
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testIntervalSampler() throws Exception {
+    final int TOT_SPLITS = 16;
+    final int PER_SPLIT_SAMPLE = 4;
+    final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+    final double FREQ = 1.0 / TOT_SPLITS;
+    InputSampler.Sampler<IntWritable,NullWritable> sampler =
+      new InputSampler.IntervalSampler<IntWritable,NullWritable>(
+          FREQ, NUM_SAMPLES);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i;
+    }
+    Job ignored = new Job();
+    Object[] samples = sampler.getSample(new TestInputSamplerIF(
+          NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i, ((IntWritable)samples[i]).get());
+    }
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+public class TestKeyFieldHelper extends TestCase {
+  private static final Log LOG = LogFactory.getLog(TestKeyFieldHelper.class);
+  /**
+   * Test is key-field-helper's parse option.
+   */
+  public void testparseOption() throws Exception {
+    KeyFieldHelper helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    String keySpecs = "-k1.2,3.4";
+    String eKeySpecs = keySpecs;
+    helper.parseOption(keySpecs);
+    String actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    // test -k a.b
+    keySpecs = "-k 1.2";
+    eKeySpecs = "-k1.2,0.0";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-nr -k1.2,3.4";
+    eKeySpecs = "-k1.2,3.4nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-nr -k1.2,3.4n";
+    eKeySpecs = "-k1.2,3.4n";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-nr -k1.2,3.4r";
+    eKeySpecs = "-k1.2,3.4r";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-nr -k1.2,3.4 -k5.6,7.8n -k9.10,11.12r -k13.14,15.16nr";
+    //1st
+    eKeySpecs = "-k1.2,3.4nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    // 2nd
+    eKeySpecs = "-k5.6,7.8n";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    //3rd
+    eKeySpecs = "-k9.10,11.12r";
+    actKeySpecs = helper.keySpecs().get(2).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    //4th
+    eKeySpecs = "-k13.14,15.16nr";
+    actKeySpecs = helper.keySpecs().get(3).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2n,3.4";
+    eKeySpecs = "-k1.2,3.4n";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2r,3.4";
+    eKeySpecs = "-k1.2,3.4r";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2nr,3.4";
+    eKeySpecs = "-k1.2,3.4nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2,3.4n";
+    eKeySpecs = "-k1.2,3.4n";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2,3.4r";
+    eKeySpecs = "-k1.2,3.4r";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2,3.4nr";
+    eKeySpecs = "-k1.2,3.4nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-nr -k1.2,3.4 -k5.6,7.8";
+    eKeySpecs = "-k1.2,3.4nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    eKeySpecs = "-k5.6,7.8nr";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-n -k1.2,3.4 -k5.6,7.8";
+    eKeySpecs = "-k1.2,3.4n";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    eKeySpecs = "-k5.6,7.8n";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-r -k1.2,3.4 -k5.6,7.8";
+    eKeySpecs = "-k1.2,3.4r";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    eKeySpecs = "-k5.6,7.8r";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2,3.4n -k5.6,7.8";
+    eKeySpecs = "-k1.2,3.4n";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    eKeySpecs = "-k5.6,7.8";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2,3.4r -k5.6,7.8";
+    eKeySpecs = "-k1.2,3.4r";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    eKeySpecs = "-k5.6,7.8";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-k1.2,3.4nr -k5.6,7.8";
+    eKeySpecs = "-k1.2,3.4nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    eKeySpecs = "-k5.6,7.8";
+    actKeySpecs = helper.keySpecs().get(1).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-n";
+    eKeySpecs = "-k1.1,0.0n";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-r";
+    eKeySpecs = "-k1.1,0.0r";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+    
+    keySpecs = "-nr";
+    eKeySpecs = "-k1.1,0.0nr";
+    helper = new KeyFieldHelper();
+    helper.parseOption(keySpecs);
+    actKeySpecs = helper.keySpecs().get(0).toString();
+    assertEquals("KeyFieldHelper's parsing is garbled", eKeySpecs, actKeySpecs);
+  }
+  
+  /**
+   * Test is key-field-helper's getWordLengths.
+   */
+  public void testGetWordLengths() throws Exception {
+    KeyFieldHelper helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    // test getWordLengths with unspecified key-specifications
+    String input = "hi";
+    int[] result = helper.getWordLengths(input.getBytes(), 0, 2);
+    assertTrue(equals(result, new int[] {1}));
+    
+    // set the key specs
+    helper.setKeyFieldSpec(1, 2);
+    
+    // test getWordLengths with 3 words
+    input = "hi\thello there";
+    result = helper.getWordLengths(input.getBytes(), 0, input.length());
+    assertTrue(equals(result, new int[] {2, 2, 11}));
+    
+    // test getWordLengths with 4 words but with a different separator
+    helper.setKeyFieldSeparator(" ");
+    input = "hi hello\tthere you";
+    result = helper.getWordLengths(input.getBytes(), 0, input.length());
+    assertTrue(equals(result, new int[] {3, 2, 11, 3}));
+    
+    // test with non zero start index
+    input = "hi hello there you where me there";
+    //                 .....................
+    result = helper.getWordLengths(input.getBytes(), 10, 33);
+    assertTrue(equals(result, new int[] {5, 4, 3, 5, 2, 3}));
+    
+    input = "hi hello there you where me ";
+    //                 ..................
+    result = helper.getWordLengths(input.getBytes(), 10, input.length());
+    assertTrue(equals(result, new int[] {5, 4, 3, 5, 2, 0}));
+    
+    input = "";
+    result = helper.getWordLengths(input.getBytes(), 0, 0);
+    assertTrue(equals(result, new int[] {1, 0}));
+    
+    input = "  abc";
+    result = helper.getWordLengths(input.getBytes(), 0, 5);
+    assertTrue(equals(result, new int[] {3, 0, 0, 3}));
+    
+    input = "  abc";
+    result = helper.getWordLengths(input.getBytes(), 0, 2);
+    assertTrue(equals(result, new int[] {3, 0, 0, 0}));
+    
+    input = " abc ";
+    result = helper.getWordLengths(input.getBytes(), 0, 2);
+    assertTrue(equals(result, new int[] {2, 0, 1}));
+    
+    helper.setKeyFieldSeparator("abcd");
+    input = "abc";
+    result = helper.getWordLengths(input.getBytes(), 0, 3);
+    assertTrue(equals(result, new int[] {1, 3}));
+  }
+  
+  /**
+   * Test is key-field-helper's getStartOffset/getEndOffset.
+   */
+  public void testgetStartEndOffset() throws Exception {
+    KeyFieldHelper helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    // test getStartOffset with -k1,2
+    helper.setKeyFieldSpec(1, 2);
+    String input = "hi\thello";
+    String expectedOutput = input;
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getStartOffset with -k1.0,0 .. should result into start = -1
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k1.0,0");
+    testKeySpecs(input, null, helper);
+    
+    // test getStartOffset with -k1,0
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k1,0");
+    expectedOutput = input;
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getStartOffset with -k1.2,0
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k1.2,0");
+    expectedOutput = "i\thello";
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getWordLengths with -k1.0,2.3
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k1.1,2.3");
+    expectedOutput = "hi\thel";
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getWordLengths with -k1.2,2.3
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k1.2,2.3");
+    expectedOutput = "i\thel";
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getStartOffset with -k1.2,3.0
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k1.2,3.0");
+    expectedOutput = "i\thello";
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getStartOffset with -k2,2
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k2,2");
+    expectedOutput = "hello";
+    testKeySpecs(input, expectedOutput, helper);
+    
+    // test getStartOffset with -k3.0,4.0
+    helper = new KeyFieldHelper();
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k3.1,4.0");
+    testKeySpecs(input, null, helper);
+    
+    // test getStartOffset with -k2.1
+    helper = new KeyFieldHelper();
+    input = "123123123123123hi\thello\thow";
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k2.1");
+    expectedOutput = "hello\thow";
+    testKeySpecs(input, expectedOutput, helper, 15, input.length());
+    
+    // test getStartOffset with -k2.1,4 with end ending on \t
+    helper = new KeyFieldHelper();
+    input = "123123123123123hi\thello\t\thow\tare";
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k2.1,3");
+    expectedOutput = "hello\t";
+    testKeySpecs(input, expectedOutput, helper, 17, input.length());
+    
+    // test getStartOffset with -k2.1 with end ending on \t
+    helper = new KeyFieldHelper();
+    input = "123123123123123hi\thello\thow\tare";
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k2.1");
+    expectedOutput = "hello\thow\t";
+    testKeySpecs(input, expectedOutput, helper, 17, 28);
+    
+    // test getStartOffset with -k2.1,3 with smaller length
+    helper = new KeyFieldHelper();
+    input = "123123123123123hi\thello\thow";
+    helper.setKeyFieldSeparator("\t");
+    helper.parseOption("-k2.1,3");
+    expectedOutput = "hello";
+    testKeySpecs(input, expectedOutput, helper, 15, 23);
+  }
+  
+  private void testKeySpecs(String input, String expectedOutput, 
+                            KeyFieldHelper helper) {
+    testKeySpecs(input, expectedOutput, helper, 0, -1);
+  }
+  
+  private void testKeySpecs(String input, String expectedOutput, 
+                            KeyFieldHelper helper, int s1, int e1) {
+    LOG.info("input : " + input);
+    String keySpecs = helper.keySpecs().get(0).toString();
+    LOG.info("keyspecs : " + keySpecs);
+    byte[] inputBytes = input.getBytes(); // get the input bytes
+    if (e1 == -1) {
+      e1 = inputBytes.length;
+    }
+    LOG.info("length : " + e1);
+    // get the word lengths
+    int[] indices = helper.getWordLengths(inputBytes, s1, e1);
+    // get the start index
+    int start = helper.getStartOffset(inputBytes, s1, e1, indices, 
+                                      helper.keySpecs().get(0));
+    LOG.info("start : " + start);
+    if (expectedOutput == null) {
+      assertEquals("Expected -1 when the start index is invalid", -1, start);
+      return;
+    }
+    // get the end index
+    int end = helper.getEndOffset(inputBytes, s1, e1, indices, 
+                                  helper.keySpecs().get(0));
+    LOG.info("end : " + end);
+    //my fix
+    end = (end >= inputBytes.length) ? inputBytes.length -1 : end;
+    int length = end + 1 - start;
+    LOG.info("length : " + length);
+    byte[] outputBytes = new byte[length];
+    System.arraycopy(inputBytes, start, outputBytes, 0, length);
+    String output = new String(outputBytes);
+    LOG.info("output : " + output);
+    LOG.info("expected-output : " + expectedOutput);
+    assertEquals(keySpecs + " failed on input '" + input + "'", 
+                 expectedOutput, output);
+  }
+
+  // check for equality of 2 int arrays
+  private boolean equals(int[] test, int[] expected) {
+    // check array length
+    if (test[0] != expected[0]) {
+      return false;
+    }
+    // if length is same then check the contents
+    for (int i = 0; i < test[0] && i < expected[0]; ++i) {
+      if (test[i] != expected[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
\ No newline at end of file

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/partition/TestKeyFieldHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native