You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/07/10 10:53:51 UTC

svn commit: r792839 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapreduce/lib/input/

Author: sharad
Date: Fri Jul 10 08:53:51 2009
New Revision: 792839

URL: http://svn.apache.org/viewvc?rev=792839&view=rev
Log:
MAPREDUCE-655. Change KeyValueLineRecordReader and KeyValueTextInputFormat to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 10 08:53:51 2009
@@ -90,6 +90,9 @@
     MAPREDUCE-623. Resolve javac warnings in mapreduce. (Jothi Padmanabhan
     via sharad)
 
+    MAPREDUCE-655. Change KeyValueLineRecordReader and KeyValueTextInputFormat
+    to use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Fri Jul 10 08:53:51 2009
@@ -29,7 +29,12 @@
  * separator character. The separator can be specified in config file 
  * under the attribute name key.value.separator.in.input.line. The default
  * separator is the tab character ('\t').
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader} 
+ * instead
  */
+@Deprecated
 public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
   
   private final LineRecordReader lineRecordReader;
@@ -60,20 +65,15 @@
     this.separator = (byte) sepStr.charAt(0);
   }
 
-  public static int findSeparator(byte[] utf, int start, int length, byte sep) {
-    for (int i = start; i < (start + length); i++) {
-      if (utf[i] == sep) {
-        return i;
-      }
-    }
-    return -1;
+  public static int findSeparator(byte[] utf, int start, int length, 
+      byte sep) {
+    return org.apache.hadoop.mapreduce.lib.input.
+      KeyValueLineRecordReader.findSeparator(utf, start, length, sep);
   }
 
   /** Read key/value pair in a line. */
   public synchronized boolean next(Text key, Text value)
     throws IOException {
-    Text tKey = key;
-    Text tValue = value;
     byte[] line = null;
     int lineLen = -1;
     if (lineRecordReader.next(dummyKey, innerValue)) {
@@ -85,19 +85,8 @@
     if (line == null)
       return false;
     int pos = findSeparator(line, 0, lineLen, this.separator);
-    if (pos == -1) {
-      tKey.set(line, 0, lineLen);
-      tValue.set("");
-    } else {
-      int keyLen = pos;
-      byte[] keyBytes = new byte[keyLen];
-      System.arraycopy(line, 0, keyBytes, 0, keyLen);
-      int valLen = lineLen - keyLen - 1;
-      byte[] valBytes = new byte[valLen];
-      System.arraycopy(line, pos + 1, valBytes, 0, valLen);
-      tKey.set(keyBytes);
-      tValue.set(valBytes);
-    }
+    org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader.
+      setKeyValue(key, value, line, lineLen, pos);
     return true;
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Fri Jul 10 08:53:51 2009
@@ -30,7 +30,12 @@
  * Either linefeed or carriage-return are used to signal end of line. Each line
  * is divided into key and value parts by a separator byte. If no such a byte
  * exists, the key will be the entire line and value will be empty.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat} 
+ * instead
  */
+@Deprecated
 public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>
   implements JobConfigurable {
 

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=792839&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java Fri Jul 10 08:53:51 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a 
+ * separator character. The separator can be specified in config file 
+ * under the attribute name key.value.separator.in.input.line. The default
+ * separator is the tab character ('\t').
+ */
+public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
+  
+  private final LineRecordReader lineRecordReader;
+
+  private byte separator = (byte) '\t';
+
+  private Text innerValue;
+
+  private Text key;
+  
+  private Text value;
+  
+  public Class<?> getKeyClass() { return Text.class; }
+  
+  public KeyValueLineRecordReader(Configuration conf)
+    throws IOException {
+    
+    lineRecordReader = new LineRecordReader();
+    String sepStr = conf.get("key.value.separator.in.input.line", "\t");
+    this.separator = (byte) sepStr.charAt(0);
+  }
+
+  public void initialize(InputSplit genericSplit,
+      TaskAttemptContext context) throws IOException {
+    lineRecordReader.initialize(genericSplit, context);
+  }
+  
+  public static int findSeparator(byte[] utf, int start, int length, 
+      byte sep) {
+    for (int i = start; i < (start + length); i++) {
+      if (utf[i] == sep) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static void setKeyValue(Text key, Text value, byte[] line,
+      int lineLen, int pos) {
+    if (pos == -1) {
+      key.set(line, 0, lineLen);
+      value.set("");
+    } else {
+      int keyLen = pos;
+      byte[] keyBytes = new byte[keyLen];
+      System.arraycopy(line, 0, keyBytes, 0, keyLen);
+      int valLen = lineLen - keyLen - 1;
+      byte[] valBytes = new byte[valLen];
+      System.arraycopy(line, pos + 1, valBytes, 0, valLen);
+      key.set(keyBytes);
+      value.set(valBytes);
+    }
+  }
+  /** Read key/value pair in a line. */
+  public synchronized boolean nextKeyValue()
+    throws IOException {
+    byte[] line = null;
+    int lineLen = -1;
+    if (lineRecordReader.nextKeyValue()) {
+      innerValue = lineRecordReader.getCurrentValue();
+      line = innerValue.getBytes();
+      lineLen = innerValue.getLength();
+    } else {
+      return false;
+    }
+    if (line == null)
+      return false;
+    if (key == null) {
+      key = new Text();
+    }
+    if (value == null) {
+      value = new Text();
+    }
+    int pos = findSeparator(line, 0, lineLen, this.separator);
+    setKeyValue(key, value, line, lineLen, pos);
+    return true;
+  }
+  
+  public Text getCurrentKey() {
+    return key;
+  }
+
+  public Text getCurrentValue() {
+    return value;
+  }
+
+  public float getProgress() {
+    return lineRecordReader.getProgress();
+  }
+  
+  public synchronized void close() throws IOException { 
+    lineRecordReader.close();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=792839&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Fri Jul 10 08:53:51 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either line feed or carriage-return are used to signal end of line. 
+ * Each line is divided into key and value parts by a separator byte. If no
+ * such a byte exists, the key will be the entire line and value will be empty.
+ */
+public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    CompressionCodec codec = 
+      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+
+  public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
+      TaskAttemptContext context) throws IOException {
+    
+    context.setStatus(genericSplit.toString());
+    return new KeyValueLineRecordReader(context.getConfiguration());
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Jul 10 08:53:51 2009
@@ -258,4 +258,26 @@
     job.setNumReduceTasks(numReds);
     return job;
   }
+
+  public static TaskAttemptContext createDummyMapTaskAttemptContext(
+      Configuration conf) {
+    TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
+    conf.set("mapred.task.id", tid.toString());
+    return new TaskAttemptContext(conf, tid);    
+  }
+
+  public static StatusReporter createDummyReporter() {
+    return new StatusReporter() {
+      public void setStatus(String s) {
+      }
+      public void progress() {
+      }
+      public Counter getCounter(Enum<?> name) {
+        return new Counters().findCounter(name);
+      }
+      public Counter getCounter(String group, String name) {
+        return new Counters().findCounter(group, name);
+      }
+    };
+  }
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java?rev=792839&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java Fri Jul 10 08:53:51 2009
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class TestMRKeyValueTextInputFormat extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestMRKeyValueTextInputFormat.class.getName());
+
+  private static int MAX_LENGTH = 10000;
+  
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestKeyValueTextInputFormat");
+  
+  public void testFormat() throws Exception {
+    Job job = new Job(new Configuration());
+    Path file = new Path(workDir, "test.txt");
+
+    int seed = new Random().nextInt();
+    LOG.info("seed = " + seed);
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+
+      LOG.debug("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i * 2));
+          writer.write("\t");
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH / 20) + 1;
+        LOG.debug("splitting: requesting = " + numSplits);
+        List<InputSplit> splits = format.getSplits(job);
+        LOG.debug("splitting: got =        " + splits.size());
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.size(); j++) {
+          LOG.debug("split["+j+"]= " + splits.get(j));
+          TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+          RecordReader<Text, Text> reader = format.createRecordReader(
+            splits.get(j), context);
+          Class<?> clazz = reader.getClass();
+          assertEquals("reader class is KeyValueLineRecordReader.", 
+            KeyValueLineRecordReader.class, clazz);
+          MapContext<Text, Text, Text, Text> mcontext = 
+            new MapContext<Text, Text, Text, Text>(job.getConfiguration(), 
+            context.getTaskAttemptID(), reader, null, null, 
+            MapReduceTestUtil.createDummyReporter(), splits.get(j));
+          reader.initialize(splits.get(j), mcontext);
+
+          Text key = null;
+          Text value = null;
+          try {
+            int count = 0;
+            while (reader.nextKeyValue()) {
+              key = reader.getCurrentKey();
+              clazz = key.getClass();
+              assertEquals("Key class is Text.", Text.class, clazz);
+              value = reader.getCurrentValue();
+              clazz = value.getClass();
+              assertEquals("Value class is Text.", Text.class, clazz);
+              int v = Integer.parseInt(value.toString());
+              LOG.debug("read " + v);
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              count++;
+            }
+            LOG.debug("splits[" + j + "]=" + splits.get(j) +" count=" + count);
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+  private LineReader makeStream(String str) throws IOException {
+    return new LineReader(new ByteArrayInputStream
+                                           (str.getBytes("UTF-8")), 
+                                           defaultConf);
+  }
+  
+  public void testUTF8() throws Exception {
+    LineReader in = makeStream("abcd\u20acbdcd\u20ac");
+    Text line = new Text();
+    in.readLine(line);
+    assertEquals("readLine changed utf8 characters", 
+                 "abcd\u20acbdcd\u20ac", line.toString());
+    in = makeStream("abc\u200axyz");
+    in.readLine(line);
+    assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+  }
+
+  public void testNewLines() throws Exception {
+    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    Text out = new Text();
+    in.readLine(out);
+    assertEquals("line1 length", 1, out.getLength());
+    in.readLine(out);
+    assertEquals("line2 length", 2, out.getLength());
+    in.readLine(out);
+    assertEquals("line3 length", 0, out.getLength());
+    in.readLine(out);
+    assertEquals("line4 length", 3, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 4, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 5, out.getLength());
+    assertEquals("end of file", 0, in.readLine(out));
+  }
+  
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+  
+  private static List<Text> readSplit(KeyValueTextInputFormat format, 
+      InputSplit split, Job job) throws IOException, InterruptedException {
+    List<Text> result = new ArrayList<Text>();
+    Configuration conf = job.getConfiguration();
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(conf);
+    RecordReader<Text, Text> reader = format.createRecordReader(split, 
+      MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+    MapContext<Text, Text, Text, Text> mcontext = 
+      new MapContext<Text, Text, Text, Text>(conf, 
+      context.getTaskAttemptID(), reader, null, null,
+      MapReduceTestUtil.createDummyReporter(), 
+      split);
+    reader.initialize(split, mcontext);
+    while (reader.nextKeyValue()) {
+      result.add(new Text(reader.getCurrentValue()));
+    }
+    return result;
+  }
+  
+  /**
+   * Test using the gzip codec for reading
+   */
+  public static void testGzip() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    CompressionCodec gzip = new GzipCodec();
+    ReflectionUtils.setConf(gzip, conf);
+    localFs.delete(workDir, true);
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+              "line-1\tthe quick\nline-2\tbrown\nline-3\t" +
+              "fox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+              "line-1\tthis is a test\nline-1\tof gzip\n");
+    Job job = new Job(conf);
+    FileInputFormat.setInputPaths(job, workDir);
+    KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+    List<InputSplit> splits = format.getSplits(job);
+    assertEquals("compressed splits == 2", 2, splits.size());
+    FileSplit tmp = (FileSplit) splits.get(0);
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits.set(0, splits.get(1));
+      splits.set(1, tmp);
+    }
+    List<Text> results = readSplit(format, splits.get(0), job);
+    assertEquals("splits[0] length", 6, results.size());
+    assertEquals("splits[0][0]", "the quick", results.get(0).toString());
+    assertEquals("splits[0][1]", "brown", results.get(1).toString());
+    assertEquals("splits[0][2]", "fox jumped", results.get(2).toString());
+    assertEquals("splits[0][3]", "over", results.get(3).toString());
+    assertEquals("splits[0][4]", " the lazy", results.get(4).toString());
+    assertEquals("splits[0][5]", " dog", results.get(5).toString());
+    results = readSplit(format, splits.get(1), job);
+    assertEquals("splits[1] length", 2, results.size());
+    assertEquals("splits[1][0]", "this is a test", 
+                 results.get(0).toString());    
+    assertEquals("splits[1][1]", "of gzip", 
+                 results.get(1).toString());    
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestMRKeyValueTextInputFormat().testFormat();
+  }
+}