You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/11/12 04:06:12 UTC

svn commit: r1540931 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...

Author: sandy
Date: Tue Nov 12 03:06:10 2013
New Revision: 1540931

URL: http://svn.apache.org/r1540931
Log:
MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader (Mariappan Asokan and BitsOfInfo via Sandy Ryza)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1540931&r1=1540930&r2=1540931&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Nov 12 03:06:10 2013
@@ -164,6 +164,9 @@ Release 2.3.0 - UNRELEASED
 
     MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
 
+    MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader
+    (Mariappan Asokan and BitsOfInfo via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java?rev=1540931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java Tue Nov 12 03:06:10 2013
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+/**
+ * FixedLengthInputFormat is an input format used to read input files
+ * which contain fixed length records.  The content of a record need not be
+ * text.  It can be arbitrary binary data.  Users must configure the record
+ * length property by calling:
+ * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
+ * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
+ * <br><br>
+ * @see FixedLengthRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FixedLengthInputFormat
+    extends FileInputFormat<LongWritable, BytesWritable>
+    implements JobConfigurable {
+
+  private CompressionCodecFactory compressionCodecs = null;
+  
+  public static final String FIXED_RECORD_LENGTH =
+      "fixedlengthinputformat.record.length"; 
+
+  /**
+   * Set the length of each record
+   * @param conf configuration
+   * @param recordLength the length of a record
+   */
+  public static void setRecordLength(Configuration conf, int recordLength) {
+    conf.setInt(FIXED_RECORD_LENGTH, recordLength);
+  }
+
+  /**
+   * Get record length value
+   * @param conf configuration
+   * @return the record length, zero means none was set
+   */
+  public static int getRecordLength(Configuration conf) {
+    return conf.getInt(FIXED_RECORD_LENGTH, 0);
+  }
+
+  @Override
+  public void configure(JobConf conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+
+  @Override
+  public RecordReader<LongWritable, BytesWritable>
+      getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
+      throws IOException {
+    reporter.setStatus(genericSplit.toString());
+    int recordLength = getRecordLength(job);
+    if (recordLength <= 0) {
+      throw new IOException("Fixed record length " + recordLength
+          + " is invalid.  It should be set to a value greater than zero");
+    }
+    return new FixedLengthRecordReader(job, (FileSplit)genericSplit,
+                                       recordLength);
+  }
+
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    return(null == codec);
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java?rev=1540931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java Tue Nov 12 03:06:10 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A reader to read fixed length records from a split.  Record offset is
+ * returned as key and the record as bytes is returned in value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedLengthRecordReader
+    implements RecordReader<LongWritable, BytesWritable> {
+
+  private int recordLength;
+  // Make use of the new API implementation to avoid code duplication.
+  private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader;
+
+  public FixedLengthRecordReader(Configuration job, FileSplit split,
+                                 int recordLength) throws IOException {
+    this.recordLength = recordLength;
+    reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader(
+        recordLength);
+    reader.initialize(job, split.getStart(), split.getLength(),
+        split.getPath());
+  }
+
+  @Override
+  public LongWritable createKey() {
+    return new LongWritable();
+  }
+  
+  @Override
+  public BytesWritable createValue() {
+    return new BytesWritable(new byte[recordLength]);
+  }
+  
+  @Override
+  public synchronized boolean next(LongWritable key, BytesWritable value)
+      throws IOException {
+    boolean dataRead = reader.nextKeyValue();
+    if (dataRead) {
+      LongWritable newKey = reader.getCurrentKey();
+      BytesWritable newValue = reader.getCurrentValue();
+      key.set(newKey.get());
+      value.set(newValue);
+    }
+    return dataRead;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return reader.getProgress();
+  }
+  
+  @Override
+  public synchronized long getPos() throws IOException {
+    return reader.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }    
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java?rev=1540931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java Tue Nov 12 03:06:10 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FixedLengthInputFormat is an input format used to read input files
+ * which contain fixed length records.  The content of a record need not be
+ * text.  It can be arbitrary binary data.  Users must configure the record
+ * length property by calling:
+ * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
+ * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
+ * <br><br>
+ * @see FixedLengthRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FixedLengthInputFormat
+    extends FileInputFormat<LongWritable, BytesWritable> {
+
+  public static final String FIXED_RECORD_LENGTH =
+      "fixedlengthinputformat.record.length"; 
+
+  /**
+   * Set the length of each record
+   * @param conf configuration
+   * @param recordLength the length of a record
+   */
+  public static void setRecordLength(Configuration conf, int recordLength) {
+    conf.setInt(FIXED_RECORD_LENGTH, recordLength);
+  }
+
+  /**
+   * Get record length value
+   * @param conf configuration
+   * @return the record length, zero means none was set
+   */
+  public static int getRecordLength(Configuration conf) {
+    return conf.getInt(FIXED_RECORD_LENGTH, 0);
+  }
+
+  @Override
+  public RecordReader<LongWritable, BytesWritable>
+      createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    int recordLength = getRecordLength(context.getConfiguration());
+    if (recordLength <= 0) {
+      throw new IOException("Fixed record length " + recordLength
+          + " is invalid.  It should be set to a value greater than zero");
+    }
+    return new FixedLengthRecordReader(recordLength);
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec = 
+        new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return (null == codec);
+  } 
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java?rev=1540931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java Tue Nov 12 03:06:10 2013
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * A reader to read fixed length records from a split.  Record offset is
+ * returned as key and the record as bytes is returned in value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedLengthRecordReader
+    extends RecordReader<LongWritable, BytesWritable> {
+  private static final Log LOG 
+      = LogFactory.getLog(FixedLengthRecordReader.class);
+
+  private int recordLength;
+  private long start;
+  private long pos;
+  private long end;
+  private long  numRecordsRemainingInSplit;
+  private FSDataInputStream fileIn;
+  private Seekable filePosition;
+  private LongWritable key;
+  private BytesWritable value;
+  private boolean isCompressedInput;
+  private Decompressor decompressor;
+  private InputStream inputStream;
+
+  public FixedLengthRecordReader(int recordLength) {
+    this.recordLength = recordLength;
+  }
+
+  @Override
+  public void initialize(InputSplit genericSplit,
+                         TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration job = context.getConfiguration();
+    final Path file = split.getPath();
+    initialize(job, split.getStart(), split.getLength(), file);
+  }
+
+  // This is also called from the old FixedLengthRecordReader API implementation
+  public void initialize(Configuration job, long splitStart, long splitLength,
+                         Path file) throws IOException {
+    start = splitStart;
+    end = start + splitLength;
+    long partialRecordLength = start % recordLength;
+    long numBytesToSkip = 0;
+    if (partialRecordLength != 0) {
+      numBytesToSkip = recordLength - partialRecordLength;
+    }
+
+    // open the file and seek to the start of the split
+    final FileSystem fs = file.getFileSystem(job);
+    fileIn = fs.open(file);
+
+    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
+    if (null != codec) {
+      isCompressedInput = true;	
+      decompressor = CodecPool.getDecompressor(codec);
+      CompressionInputStream cIn
+          = codec.createInputStream(fileIn, decompressor);
+      filePosition = cIn;
+      inputStream = cIn;
+      numRecordsRemainingInSplit = Long.MAX_VALUE;
+      LOG.info(
+          "Compressed input; cannot compute number of records in the split");
+    } else {
+      fileIn.seek(start);
+      filePosition = fileIn;
+      inputStream = fileIn;
+      long splitSize = end - start - numBytesToSkip;
+      numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength;
+      if (numRecordsRemainingInSplit < 0) {
+        numRecordsRemainingInSplit = 0;
+      }
+      LOG.info("Expecting " + numRecordsRemainingInSplit
+          + " records each with a length of " + recordLength
+          + " bytes in the split with an effective size of "
+          + splitSize + " bytes");
+    }
+    if (numBytesToSkip != 0) {
+      start += inputStream.skip(numBytesToSkip);
+    }
+    this.pos = start;
+  }
+
+  @Override
+  public synchronized boolean nextKeyValue() throws IOException {
+    if (key == null) {
+      key = new LongWritable();
+    }
+    if (value == null) {
+      value = new BytesWritable(new byte[recordLength]);
+    }
+    boolean dataRead = false;
+    value.setSize(recordLength);
+    byte[] record = value.getBytes();
+    if (numRecordsRemainingInSplit > 0) {
+      key.set(pos);
+      int offset = 0;
+      int numBytesToRead = recordLength;
+      int numBytesRead = 0;
+      while (numBytesToRead > 0) {
+        numBytesRead = inputStream.read(record, offset, numBytesToRead);
+        if (numBytesRead == -1) {
+          // EOF
+          break;
+        }
+        offset += numBytesRead;
+        numBytesToRead -= numBytesRead;
+      }
+      numBytesRead = recordLength - numBytesToRead;
+      pos += numBytesRead;
+      if (numBytesRead > 0) {
+        dataRead = true;
+        if (numBytesRead >= recordLength) {
+          if (!isCompressedInput) {
+            numRecordsRemainingInSplit--;
+          }
+        } else {
+          throw new IOException("Partial record(length = " + numBytesRead
+              + ") found at the end of split.");
+        }
+      } else {
+        numRecordsRemainingInSplit = 0L; // End of input.
+      }
+    }
+    return dataRead;
+  }
+
+  @Override
+  public LongWritable getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public BytesWritable getCurrentValue() {
+    return value;
+  }
+
+  @Override
+  public synchronized float getProgress() throws IOException {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
+    }
+  }
+  
+  @Override
+  public synchronized void close() throws IOException {
+    try {
+      if (inputStream != null) {
+        inputStream.close();
+        inputStream = null;
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+    }
+  }
+
+  // This is called from the old FixedLengthRecordReader API implementation.
+  public long getPos() {
+    return pos;
+  }
+
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput && null != filePosition) {
+      retVal = filePosition.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java?rev=1540931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java Tue Nov 12 03:06:10 2013
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+public class TestFixedLengthInputFormat {
+
+  private static Log LOG;
+  private static Configuration defaultConf;
+  private static FileSystem localFs; 
+  private static Path workDir;
+  private static Reporter voidReporter;
+  
+  // some chars for the record data
+  private static char[] chars;
+  private static Random charRand;
+
+  @BeforeClass
+  public static void onlyOnce() {
+    try {
+      LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
+      defaultConf = new Configuration();
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+      voidReporter = Reporter.NULL;
+      // our set of chars
+      chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
+          + "(*&^%$#@!-=><?:\"{}][';/.,']").toCharArray();
+      workDir = 
+          new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+          "TestKeyValueFixedLengthInputFormat");
+      charRand = new Random();
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * uncompressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormat() throws IOException {
+    runRandomTests(null);
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * compressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormatCompressedIn() throws IOException {
+    runRandomTests(new GzipCodec());
+  }
+
+  /**
+   * Test with no record length set.
+   */
+  @Test (timeout=5000)
+  public void testNoRecordLength() throws IOException {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    JobConf job = new JobConf(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.configure(job);
+    InputSplit splits[] = format.getSplits(job, 1);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.getRecordReader(split, job, voidReporter);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for not setting record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to 0
+   */
+  @Test (timeout=5000)
+  public void testZeroRecordLength() throws IOException {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    JobConf job = new JobConf(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(job, 0);
+    format.configure(job);
+    InputSplit splits[] = format.getSplits(job, 1);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        RecordReader<LongWritable, BytesWritable> reader = 
+                             format.getRecordReader(split, job, voidReporter);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for zero record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to a negative value
+   */
+  @Test (timeout=5000)
+  public void testNegativeRecordLength() throws IOException {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    JobConf job = new JobConf(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(job, -10);
+    format.configure(job);
+    InputSplit splits[] = format.getSplits(job, 1);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.getRecordReader(split, job, voidReporter);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for negative record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with partial record at the end of a compressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordCompressedIn() throws IOException {
+    CompressionCodec gzip = new GzipCodec();
+    runPartialRecordTest(gzip);
+  }
+
+  /**
+   * Test with partial record at the end of an uncompressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordUncompressedIn() throws IOException {
+    runPartialRecordTest(null);
+  }
+
+  /**
+   * Test using the gzip codec with two input files.
+   */
+  @Test (timeout=5000)
+  public void testGzipWithTwoInputs() throws IOException {
+    CompressionCodec gzip = new GzipCodec();
+    localFs.delete(workDir, true);
+    // Create files with fixed length records with 5 byte long records.
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+        "one  two  threefour five six  seveneightnine ten  ");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+        "ten  nine eightsevensix  five four threetwo  one  ");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    JobConf job = new JobConf(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    ReflectionUtils.setConf(gzip, job);
+    format.configure(job);
+    InputSplit[] splits = format.getSplits(job, 100);
+    assertEquals("compressed splits == 2", 2, splits.length);
+    FileSplit tmp = (FileSplit) splits[0];
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits[0] = splits[1];
+      splits[1] = tmp;
+    }
+    List<String> results = readSplit(format, splits[0], job);
+    assertEquals("splits[0] length", 10, results.size());
+    assertEquals("splits[0][5]", "six  ", results.get(5));
+    results = readSplit(format, splits[1], job);
+    assertEquals("splits[1] length", 10, results.size());
+    assertEquals("splits[1][0]", "ten  ", results.get(0));
+    assertEquals("splits[1][1]", "nine ", results.get(1));
+  }
+
+  // Create a file containing fixed length records with random data
+  private ArrayList<String> createFile(Path targetFile, CompressionCodec codec,
+                                       int recordLen,
+                                       int numRecords) throws IOException {
+    ArrayList<String> recordList = new ArrayList<String>(numRecords);
+    OutputStream ostream = localFs.create(targetFile);
+    if (codec != null) {
+      ostream = codec.createOutputStream(ostream);
+    }
+    Writer writer = new OutputStreamWriter(ostream);
+    try {
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < numRecords; i++) {
+        for (int j = 0; j < recordLen; j++) {
+          sb.append(chars[charRand.nextInt(chars.length)]);
+        }
+        String recordData = sb.toString();
+        recordList.add(recordData);
+        writer.write(recordData);
+        sb.setLength(0);
+      }
+    } finally {
+      writer.close();
+    }
+    return recordList;
+  }
+
+  private void runRandomTests(CompressionCodec codec) throws IOException {
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+    }
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, fileName.toString());
+    int seed = new Random().nextInt();
+    LOG.info("Seed = " + seed);
+    Random random = new Random(seed);
+    int MAX_TESTS = 20;
+    LongWritable key = new LongWritable();
+    BytesWritable value = new BytesWritable();
+
+    for (int i = 0; i < MAX_TESTS; i++) {
+      LOG.info("----------------------------------------------------------");
+      // Maximum total records of 999
+      int totalRecords = random.nextInt(999)+1;
+      // Test an empty file
+      if (i == 8) {
+         totalRecords = 0;
+      }
+      // Maximum bytes in a record of 100K
+      int recordLength = random.nextInt(1024*100)+1;
+      // For the 11th test, force a record length of 1
+      if (i == 10) {
+        recordLength = 1;
+      }
+      // The total bytes in the test file
+      int fileSize = (totalRecords * recordLength);
+      LOG.info("totalRecords=" + totalRecords + " recordLength="
+          + recordLength);
+      // Create the test file
+      ArrayList<String> recordList
+          = createFile(file, codec, recordLength, totalRecords);
+      assertTrue(localFs.exists(file));
+      // Set the fixed length record length config property 
+      Configuration testConf = new Configuration(defaultConf);
+      FixedLengthInputFormat.setRecordLength(testConf, recordLength);
+
+      int numSplits = 1;
+      // Arbitrarily set number of splits.
+      if (i > 0) {
+        if (i == (MAX_TESTS-1)) {
+          // Test a split size that is less than record len
+          numSplits = (int)(fileSize/Math.floor(recordLength/2));
+        } else {
+          if (MAX_TESTS % i == 0) {
+            // Let us create a split size that is forced to be 
+            // smaller than the end file itself, (ensures 1+ splits)
+            numSplits = fileSize/(fileSize - random.nextInt(fileSize));
+          } else {
+            // Just pick a random split size with no upper bound 
+            numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
+          }
+        }
+        LOG.info("Number of splits set to: " + numSplits);
+      }
+
+      // Create the job, and setup the input path
+      JobConf job = new JobConf(testConf);
+      FileInputFormat.setInputPaths(job, workDir);
+      // Try splitting the file in a variety of sizes
+      FixedLengthInputFormat format = new FixedLengthInputFormat();
+      format.configure(job);
+      InputSplit splits[] = format.getSplits(job, numSplits);
+      LOG.info("Actual number of splits = " + splits.length);
+      // Test combined split lengths = total file size
+      long recordOffset = 0;
+      int recordNumber = 0;
+      for (InputSplit split : splits) {
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.getRecordReader(split, job, voidReporter);
+        Class<?> clazz = reader.getClass();
+        assertEquals("RecordReader class should be FixedLengthRecordReader:", 
+            FixedLengthRecordReader.class, clazz);
+        // Plow through the records in this split
+        while (reader.next(key, value)) {
+          assertEquals("Checking key", (long)(recordNumber*recordLength),
+              key.get());
+          String valueString =
+              new String(value.getBytes(), 0, value.getLength());
+          assertEquals("Checking record length:", recordLength,
+              value.getLength());
+          assertTrue("Checking for more records than expected:",
+              recordNumber < totalRecords);
+          String origRecord = recordList.get(recordNumber);
+          assertEquals("Checking record content:", origRecord, valueString);
+          recordNumber++;
+        }
+        reader.close();
+      }
+      assertEquals("Total original records should be total read records:",
+          recordList.size(), recordNumber);
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+
+  private static List<String> readSplit(FixedLengthInputFormat format, 
+                                        InputSplit split, 
+                                        JobConf job) throws IOException {
+    List<String> result = new ArrayList<String>();
+    RecordReader<LongWritable, BytesWritable> reader =
+        format.getRecordReader(split, job, voidReporter);
+    LongWritable key = reader.createKey();
+    BytesWritable value = reader.createValue();
+    while (reader.next(key, value)) {
+      result.add(new String(value.getBytes(), 0, value.getLength()));
+    }
+    reader.close();
+    return result;
+  }
+
+  private void runPartialRecordTest(CompressionCodec codec) throws IOException {
+    localFs.delete(workDir, true);
+    // Create a file with fixed length records with 5 byte long
+    // records with a partial record at the end.
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+    }
+    writeFile(localFs, new Path(workDir, fileName.toString()), codec,
+        "one  two  threefour five six  seveneightnine ten");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    JobConf job = new JobConf(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    if (codec != null) {
+      ReflectionUtils.setConf(codec, job);
+    }
+    format.configure(job);
+    InputSplit[] splits = format.getSplits(job, 100);
+    if (codec != null) {
+      assertEquals("compressed splits == 1", 1, splits.length);
+    }
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        List<String> results = readSplit(format, split, job);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for partial record:", exceptionThrown);
+  }
+
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java?rev=1540931&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java Tue Nov 12 03:06:10 2013
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+public class TestFixedLengthInputFormat {
+
+  private static Log LOG;
+  private static Configuration defaultConf;
+  private static FileSystem localFs;
+  private static Path workDir;
+
+  // some chars for the record data
+  private static char[] chars;
+  private static Random charRand;
+
+  @BeforeClass
+  public static void onlyOnce() {
+    try {
+      LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
+      defaultConf = new Configuration();
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+      // our set of chars
+      chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
+          + "(*&^%$#@!-=><?:\"{}][';/.,']").toCharArray();
+      workDir = 
+          new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+          "TestKeyValueFixedLengthInputFormat");
+      charRand = new Random();
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * uncompressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormat() throws Exception {
+    runRandomTests(null);
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * compressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormatCompressedIn() throws Exception {
+    runRandomTests(new GzipCodec());
+  }
+
+  /**
+   * Test with no record length set.
+   */
+  @Test (timeout=5000)
+  public void testNoRecordLength() throws Exception {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    Job job = Job.getInstance(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    List<InputSplit> splits = format.getSplits(job);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader =
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for not setting record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to 0
+   */
+  @Test (timeout=5000)
+  public void testZeroRecordLength() throws Exception {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(testConf, 0);
+    Job job = Job.getInstance(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        TaskAttemptContext context =
+            MapReduceTestUtil.createDummyMapTaskAttemptContext(
+            job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for zero record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to a negative value
+   */
+  @Test (timeout=5000)
+  public void testNegativeRecordLength() throws Exception {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(testConf, -10);
+    Job job = Job.getInstance(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for negative record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with partial record at the end of a compressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordCompressedIn() throws Exception {
+    CompressionCodec gzip = new GzipCodec();
+    runPartialRecordTest(gzip);
+  }
+
+  /**
+   * Test with partial record at the end of an uncompressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordUncompressedIn() throws Exception {
+    runPartialRecordTest(null);
+  }
+
+  /**
+   * Test using the gzip codec with two input files.
+   */
+  @Test (timeout=5000)
+  public void testGzipWithTwoInputs() throws Exception {
+    CompressionCodec gzip = new GzipCodec();
+    localFs.delete(workDir, true);
+    // Create files with fixed length records with 5 byte long records.
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+        "one  two  threefour five six  seveneightnine ten  ");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+        "ten  nine eightsevensix  five four threetwo  one  ");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    ReflectionUtils.setConf(gzip, defaultConf);
+    Job job = Job.getInstance(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    assertEquals("compressed splits == 2", 2, splits.size());
+    FileSplit tmp = (FileSplit) splits.get(0);
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits.set(0, splits.get(1));
+      splits.set(1, tmp);
+    }
+    List<String> results = readSplit(format, splits.get(0), job);
+    assertEquals("splits[0] length", 10, results.size());
+    assertEquals("splits[0][5]", "six  ", results.get(5));
+    results = readSplit(format, splits.get(1), job);
+    assertEquals("splits[1] length", 10, results.size());
+    assertEquals("splits[1][0]", "ten  ", results.get(0));
+    assertEquals("splits[1][1]", "nine ", results.get(1));
+  }
+
+  // Create a file containing fixed length records with random data
+  private ArrayList<String> createFile(Path targetFile, CompressionCodec codec,
+                                       int recordLen,
+                                       int numRecords) throws IOException {
+    ArrayList<String> recordList = new ArrayList<String>(numRecords);
+    OutputStream ostream = localFs.create(targetFile);
+    if (codec != null) {
+      ostream = codec.createOutputStream(ostream);
+    }
+    Writer writer = new OutputStreamWriter(ostream);
+    try {
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < numRecords; i++) {
+        for (int j = 0; j < recordLen; j++) {
+          sb.append(chars[charRand.nextInt(chars.length)]);
+        }
+        String recordData = sb.toString();
+        recordList.add(recordData);
+        writer.write(recordData);
+        sb.setLength(0);
+      }
+    } finally {
+      writer.close();
+    }
+    return recordList;
+  }
+
+  private void runRandomTests(CompressionCodec codec) throws Exception {
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+    }
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, fileName.toString());
+    int seed = new Random().nextInt();
+    LOG.info("Seed = " + seed);
+    Random random = new Random(seed);
+    int MAX_TESTS = 20;
+    LongWritable key;
+    BytesWritable value;
+
+    for (int i = 0; i < MAX_TESTS; i++) {
+      LOG.info("----------------------------------------------------------");
+      // Maximum total records of 999
+      int totalRecords = random.nextInt(999)+1;
+      // Test an empty file
+      if (i == 8) {
+         totalRecords = 0;
+      }
+      // Maximum bytes in a record of 100K
+      int recordLength = random.nextInt(1024*100)+1;
+      // For the 11th test, force a record length of 1
+      if (i == 10) {
+        recordLength = 1;
+      }
+      // The total bytes in the test file
+      int fileSize = (totalRecords * recordLength);
+      LOG.info("totalRecords=" + totalRecords + " recordLength="
+          + recordLength);
+      // Create the test file
+      ArrayList<String> recordList =
+          createFile(file, codec, recordLength, totalRecords);
+      assertTrue(localFs.exists(file));
+      // Set the fixed length record length config property 
+      Configuration testConf = new Configuration(defaultConf);
+      FixedLengthInputFormat.setRecordLength(testConf, recordLength);
+
+      int numSplits = 1;
+      // Arbitrarily set number of splits.
+      if (i > 0) {
+        if (i == (MAX_TESTS-1)) {
+          // Test a split size that is less than record len
+          numSplits = (int)(fileSize/Math.floor(recordLength/2));
+        } else {
+          if (MAX_TESTS % i == 0) {
+            // Let us create a split size that is forced to be 
+            // smaller than the end file itself, (ensures 1+ splits)
+            numSplits = fileSize/(fileSize - random.nextInt(fileSize));
+          } else {
+            // Just pick a random split size with no upper bound 
+            numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
+          }
+        }
+        LOG.info("Number of splits set to: " + numSplits);
+      }
+      testConf.setLong("mapreduce.input.fileinputformat.split.maxsize", 
+          (long)(fileSize/numSplits));
+
+      // Create the job, and setup the input path
+      Job job = Job.getInstance(testConf);
+      FileInputFormat.setInputPaths(job, workDir);
+      // Try splitting the file in a variety of sizes
+      FixedLengthInputFormat format = new FixedLengthInputFormat();
+      List<InputSplit> splits = format.getSplits(job);
+      LOG.info("Actual number of splits = " + splits.size());
+      // Test combined split lengths = total file size
+      long recordOffset = 0;
+      int recordNumber = 0;
+      for (InputSplit split : splits) {
+        TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+        Class<?> clazz = reader.getClass();
+        assertEquals("RecordReader class should be FixedLengthRecordReader:", 
+            FixedLengthRecordReader.class, clazz);
+        // Plow through the records in this split
+        while (reader.nextKeyValue()) {
+          key = reader.getCurrentKey();
+          value = reader.getCurrentValue();
+          assertEquals("Checking key", (long)(recordNumber*recordLength),
+              key.get());
+          String valueString = new String(value.getBytes(), 0,
+              value.getLength());
+          assertEquals("Checking record length:", recordLength,
+              value.getLength());
+          assertTrue("Checking for more records than expected:",
+              recordNumber < totalRecords);
+          String origRecord = recordList.get(recordNumber);
+          assertEquals("Checking record content:", origRecord, valueString);
+          recordNumber++;
+        }
+        reader.close();
+      }
+      assertEquals("Total original records should be total read records:",
+          recordList.size(), recordNumber);
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+
+  private static List<String> readSplit(FixedLengthInputFormat format, 
+                                        InputSplit split, 
+                                        Job job) throws Exception {
+    List<String> result = new ArrayList<String>();
+    TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+    RecordReader<LongWritable, BytesWritable> reader =
+        format.createRecordReader(split, context);
+    MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+        mcontext =
+        new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+        BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+        reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+    reader.initialize(split, mcontext);
+    LongWritable key;
+    BytesWritable value;
+    while (reader.nextKeyValue()) {
+      key = reader.getCurrentKey();
+      value = reader.getCurrentValue();
+      result.add(new String(value.getBytes(), 0, value.getLength()));
+    }
+    reader.close();
+    return result;
+  }
+
+  private void runPartialRecordTest(CompressionCodec codec) throws Exception {
+    localFs.delete(workDir, true);
+    // Create a file with fixed length records with 5 byte long
+    // records with a partial record at the end.
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+      ReflectionUtils.setConf(codec, defaultConf);
+    }
+    writeFile(localFs, new Path(workDir, fileName.toString()), codec,
+        "one  two  threefour five six  seveneightnine ten");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    Job job = Job.getInstance(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    if (codec != null) {
+      assertEquals("compressed splits == 1", 1, splits.size());
+    }
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        List<String> results = readSplit(format, split, job);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for partial record:", exceptionThrown);
+  }
+
+}