You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2013/04/24 16:11:51 UTC

svn commit: r1471424 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/h...

Author: bobby
Date: Wed Apr 24 14:11:50 2013
New Revision: 1471424

URL: http://svn.apache.org/r1471424
Log:
MAPREDUCE-5069. add concrete common implementations of CombineFileInputFormat (Sangjin Lee via bobby)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1471424&r1=1471423&r2=1471424&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Apr 24 14:11:50 2013
@@ -209,6 +209,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5175. Updated MR App to not set envs that will be set by NMs
     anyways after YARN-561. (Xuan Gong via vinodkv)
 
+    MAPREDUCE-5069. add concrete common implementations of
+    CombineFileInputFormat (Sangjin Lee via bobby)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineFileRecordReaderWrapper.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A wrapper class for a record reader that handles a single file split. It
+ * delegates most of the methods to the wrapped instance. A concrete subclass
+ * needs to provide a constructor that calls this parent constructor with the
+ * appropriate input format. The subclass constructor must satisfy the specific
+ * constructor signature that is required by
+ * <code>CombineFileRecordReader</code>.
+ *
+ * Subclassing is needed to get a concrete record reader wrapper because of the
+ * constructor requirement.
+ *
+ * @see CombineFileRecordReader
+ * @see CombineFileInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class CombineFileRecordReaderWrapper<K,V>
+  implements RecordReader<K,V> {
+  private final RecordReader<K,V> delegate;
+
+  protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat,
+    CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx)
+    throws IOException {
+    FileSplit fileSplit = new FileSplit(split.getPath(idx),
+      split.getOffset(idx),
+      split.getLength(idx),
+      split.getLocations());
+
+    delegate = inputFormat.getRecordReader(fileSplit, (JobConf)conf, reporter);
+  }
+
+  public boolean next(K key, V value) throws IOException {
+    return delegate.next(key, value);
+  }
+
+  public K createKey() {
+    return delegate.createKey();
+  }
+
+  public V createValue() {
+    return delegate.createValue();
+  }
+
+  public long getPos() throws IOException {
+    return delegate.getPos();
+  }
+
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  public float getProgress() throws IOException {
+    return delegate.getProgress();
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineSequenceFileInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+/**
+ * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
+ * <code>SequenceFileInputFormat</code>.
+ *
+ * @see CombineFileInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CombineSequenceFileInputFormat<K,V>
+  extends CombineFileInputFormat<K,V> {
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public RecordReader<K,V> getRecordReader(InputSplit split, JobConf conf,
+    Reporter reporter) throws IOException {
+    return new CombineFileRecordReader(conf, (CombineFileSplit)split, reporter,
+      SequenceFileRecordReaderWrapper.class);
+  }
+
+  /**
+   * A record reader that may be passed to <code>CombineFileRecordReader</code>
+   * so that it can be used in a <code>CombineFileInputFormat</code>-equivalent
+   * for <code>SequenceFileInputFormat</code>.
+   *
+   * @see CombineFileRecordReader
+   * @see CombineFileInputFormat
+   * @see SequenceFileInputFormat
+   */
+  private static class SequenceFileRecordReaderWrapper<K,V>
+    extends CombineFileRecordReaderWrapper<K,V> {
+    // this constructor signature is required by CombineFileRecordReader
+    public SequenceFileRecordReaderWrapper(CombineFileSplit split,
+      Configuration conf, Reporter reporter, Integer idx) throws IOException {
+      super(new SequenceFileInputFormat<K,V>(), split, conf, reporter, idx);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/CombineTextInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
+ * <code>TextInputFormat</code>.
+ *
+ * @see CombineFileInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CombineTextInputFormat
+  extends CombineFileInputFormat<LongWritable,Text> {
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public RecordReader<LongWritable,Text> getRecordReader(InputSplit split,
+    JobConf conf, Reporter reporter) throws IOException {
+    return new CombineFileRecordReader(conf, (CombineFileSplit)split, reporter,
+      TextRecordReaderWrapper.class);
+  }
+
+  /**
+   * A record reader that may be passed to <code>CombineFileRecordReader</code>
+   * so that it can be used in a <code>CombineFileInputFormat</code>-equivalent
+   * for <code>TextInputFormat</code>.
+   *
+   * @see CombineFileRecordReader
+   * @see CombineFileInputFormat
+   * @see TextInputFormat
+   */
+  private static class TextRecordReaderWrapper
+    extends CombineFileRecordReaderWrapper<LongWritable,Text> {
+    // this constructor signature is required by CombineFileRecordReader
+    public TextRecordReaderWrapper(CombineFileSplit split, Configuration conf,
+      Reporter reporter, Integer idx) throws IOException {
+      super(new TextInputFormat(), split, conf, reporter, idx);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReaderWrapper.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A wrapper class for a record reader that handles a single file split. It
+ * delegates most of the methods to the wrapped instance. A concrete subclass
+ * needs to provide a constructor that calls this parent constructor with the
+ * appropriate input format. The subclass constructor must satisfy the specific
+ * constructor signature that is required by
+ * <code>CombineFileRecordReader</code>.
+ *
+ * Subclassing is needed to get a concrete record reader wrapper because of the
+ * constructor requirement.
+ *
+ * @see CombineFileRecordReader
+ * @see CombineFileInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class CombineFileRecordReaderWrapper<K,V>
+  extends RecordReader<K,V> {
+  private final FileSplit fileSplit;
+  private final RecordReader<K,V> delegate;
+
+  protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat,
+    CombineFileSplit split, TaskAttemptContext context, Integer idx)
+    throws IOException, InterruptedException {
+    fileSplit = new FileSplit(split.getPath(idx),
+      split.getOffset(idx),
+      split.getLength(idx),
+      split.getLocations());
+
+    delegate = inputFormat.createRecordReader(fileSplit, context);
+  }
+
+  public void initialize(InputSplit split, TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    // it really should be the same file split at the time the wrapper instance
+    // was created
+    assert fileSplitIsValid(context);
+
+    delegate.initialize(fileSplit, context);
+  }
+
+  private boolean fileSplitIsValid(TaskAttemptContext context) {
+    Configuration conf = context.getConfiguration();
+    long offset = conf.getLong(MRJobConfig.MAP_INPUT_START, 0L);
+    if (fileSplit.getStart() != offset) {
+      return false;
+    }
+    long length = conf.getLong(MRJobConfig.MAP_INPUT_PATH, 0L);
+    if (fileSplit.getLength() != length) {
+      return false;
+    }
+    String path = conf.get(MRJobConfig.MAP_INPUT_FILE);
+    if (!fileSplit.getPath().toString().equals(path)) {
+      return false;
+    }
+    return true;
+  }
+
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return delegate.nextKeyValue();
+  }
+
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return delegate.getCurrentKey();
+  }
+
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return delegate.getCurrentValue();
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return delegate.getProgress();
+  }
+
+  public void close() throws IOException {
+    delegate.close();
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineSequenceFileInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
+ * <code>SequenceFileInputFormat</code>.
+ *
+ * @see CombineFileInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CombineSequenceFileInputFormat<K,V>
+  extends CombineFileInputFormat<K,V> {
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public RecordReader<K,V> createRecordReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    return new CombineFileRecordReader((CombineFileSplit)split, context,
+      SequenceFileRecordReaderWrapper.class);
+  }
+
+  /**
+   * A record reader that may be passed to <code>CombineFileRecordReader</code>
+   * so that it can be used in a <code>CombineFileInputFormat</code>-equivalent
+   * for <code>SequenceFileInputFormat</code>.
+   *
+   * @see CombineFileRecordReader
+   * @see CombineFileInputFormat
+   * @see SequenceFileInputFormat
+   */
+  private static class SequenceFileRecordReaderWrapper<K,V>
+    extends CombineFileRecordReaderWrapper<K,V> {
+    // this constructor signature is required by CombineFileRecordReader
+    public SequenceFileRecordReaderWrapper(CombineFileSplit split,
+      TaskAttemptContext context, Integer idx)
+      throws IOException, InterruptedException {
+      super(new SequenceFileInputFormat<K,V>(), split, context, idx);
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineTextInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
+ * <code>TextInputFormat</code>.
+ *
+ * @see CombineFileInputFormat
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CombineTextInputFormat
+  extends CombineFileInputFormat<LongWritable,Text> {
+  public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    return new CombineFileRecordReader<LongWritable,Text>(
+      (CombineFileSplit)split, context, TextRecordReaderWrapper.class);
+  }
+
+  /**
+   * A record reader that may be passed to <code>CombineFileRecordReader</code>
+   * so that it can be used in a <code>CombineFileInputFormat</code>-equivalent
+   * for <code>TextInputFormat</code>.
+   *
+   * @see CombineFileRecordReader
+   * @see CombineFileInputFormat
+   * @see TextInputFormat
+   */
+  private static class TextRecordReaderWrapper
+    extends CombineFileRecordReaderWrapper<LongWritable,Text> {
+    // this constructor signature is required by CombineFileRecordReader
+    public TextRecordReaderWrapper(CombineFileSplit split,
+      TaskAttemptContext context, Integer idx)
+      throws IOException, InterruptedException {
+      super(new TextInputFormat(), split, context, idx);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineSequenceFileInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.hadoop.mapred.lib.CombineSequenceFileInputFormat;
+import org.junit.Test;
+
+public class TestCombineSequenceFileInputFormat {
+  private static final Log LOG =
+    LogFactory.getLog(TestCombineSequenceFileInputFormat.class);
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs = null;
+
+  static {
+    try {
+      conf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestCombineSequenceFileInputFormat").makeQualified(localFs);
+
+  @Test(timeout=10000)
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf(conf);
+
+    Reporter reporter = Reporter.NULL;
+
+    Random random = new Random();
+    long seed = random.nextLong();
+    LOG.info("seed = "+seed);
+    random.setSeed(seed);
+
+    localFs.delete(workDir, true);
+
+    FileInputFormat.setInputPaths(job, workDir);
+
+    final int length = 10000;
+    final int numFiles = 10;
+
+    // create a file with various lengths
+    createFiles(length, numFiles, random);
+
+    // create a combine split for the files
+    InputFormat<IntWritable, BytesWritable> format =
+      new CombineSequenceFileInputFormat<IntWritable, BytesWritable>();
+    IntWritable key = new IntWritable();
+    BytesWritable value = new BytesWritable();
+    for (int i = 0; i < 3; i++) {
+      int numSplits =
+        random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1;
+      LOG.info("splitting: requesting = " + numSplits);
+      InputSplit[] splits = format.getSplits(job, numSplits);
+      LOG.info("splitting: got =        " + splits.length);
+
+      // we should have a single split as the length is comfortably smaller than
+      // the block size
+      assertEquals("We got more than one splits!", 1, splits.length);
+      InputSplit split = splits[0];
+      assertEquals("It should be CombineFileSplit",
+        CombineFileSplit.class, split.getClass());
+
+      // check each split
+      BitSet bits = new BitSet(length);
+      RecordReader<IntWritable, BytesWritable> reader =
+        format.getRecordReader(split, job, reporter);
+      try {
+        while (reader.next(key, value)) {
+          assertFalse("Key in multiple partitions.", bits.get(key.get()));
+          bits.set(key.get());
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
+    }
+  }
+
+  private static class Range {
+    private final int start;
+    private final int end;
+
+    Range(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public String toString() {
+      return "(" + start + ", " + end + ")";
+    }
+  }
+
+  private static Range[] createRanges(int length, int numFiles, Random random) {
+    // generate a number of files with various lengths
+    Range[] ranges = new Range[numFiles];
+    for (int i = 0; i < numFiles; i++) {
+      int start = i == 0 ? 0 : ranges[i-1].end;
+      int end = i == numFiles - 1 ?
+        length :
+        (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1;
+      ranges[i] = new Range(start, end);
+    }
+    return ranges;
+  }
+
+  private static void createFiles(int length, int numFiles, Random random)
+    throws IOException {
+    Range[] ranges = createRanges(length, numFiles, random);
+
+    for (int i = 0; i < numFiles; i++) {
+      Path file = new Path(workDir, "test_" + i + ".seq");
+      // create a file with length entries
+      @SuppressWarnings("deprecation")
+      SequenceFile.Writer writer =
+        SequenceFile.createWriter(localFs, conf, file,
+                                  IntWritable.class, BytesWritable.class);
+      Range range = ranges[i];
+      try {
+        for (int j = range.start; j < range.end; j++) {
+          IntWritable key = new IntWritable(j);
+          byte[] data = new byte[random.nextInt(10)];
+          random.nextBytes(data);
+          BytesWritable value = new BytesWritable(data);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestCombineTextInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.fail;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
+import org.apache.hadoop.mapred.lib.CombineTextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+public class TestCombineTextInputFormat {
+  private static final Log LOG =
+    LogFactory.getLog(TestCombineTextInputFormat.class);
+
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null;
+
+  static {
+    try {
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestCombineTextInputFormat").makeQualified(localFs);
+
+  // A reporter that does nothing
+  private static final Reporter voidReporter = Reporter.NULL;
+
+  @Test(timeout=10000)
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf(defaultConf);
+
+    Random random = new Random();
+    long seed = random.nextLong();
+    LOG.info("seed = "+seed);
+    random.setSeed(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+
+    final int length = 10000;
+    final int numFiles = 10;
+
+    createFiles(length, numFiles, random);
+
+    // create a combined split for the files
+    CombineTextInputFormat format = new CombineTextInputFormat();
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    for (int i = 0; i < 3; i++) {
+      int numSplits = random.nextInt(length/20)+1;
+      LOG.info("splitting: requesting = " + numSplits);
+      InputSplit[] splits = format.getSplits(job, numSplits);
+      LOG.info("splitting: got =        " + splits.length);
+
+      // we should have a single split as the length is comfortably smaller than
+      // the block size
+      assertEquals("We got more than one splits!", 1, splits.length);
+      InputSplit split = splits[0];
+      assertEquals("It should be CombineFileSplit",
+        CombineFileSplit.class, split.getClass());
+
+      // check the split
+      BitSet bits = new BitSet(length);
+      LOG.debug("split= " + split);
+      RecordReader<LongWritable, Text> reader =
+        format.getRecordReader(split, job, voidReporter);
+      try {
+        int count = 0;
+        while (reader.next(key, value)) {
+          int v = Integer.parseInt(value.toString());
+          LOG.debug("read " + v);
+          if (bits.get(v)) {
+            LOG.warn("conflict with " + v +
+                     " at position "+reader.getPos());
+          }
+          assertFalse("Key in multiple partitions.", bits.get(v));
+          bits.set(v);
+          count++;
+        }
+        LOG.info("splits="+split+" count=" + count);
+      } finally {
+        reader.close();
+      }
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
+    }
+  }
+
+  private static class Range {
+    private final int start;
+    private final int end;
+
+    Range(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public String toString() {
+      return "(" + start + ", " + end + ")";
+    }
+  }
+
+  private static Range[] createRanges(int length, int numFiles, Random random) {
+    // generate a number of files with various lengths
+    Range[] ranges = new Range[numFiles];
+    for (int i = 0; i < numFiles; i++) {
+      int start = i == 0 ? 0 : ranges[i-1].end;
+      int end = i == numFiles - 1 ?
+        length :
+        (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1;
+      ranges[i] = new Range(start, end);
+    }
+    return ranges;
+  }
+
+  private static void createFiles(int length, int numFiles, Random random)
+    throws IOException {
+    Range[] ranges = createRanges(length, numFiles, random);
+
+    for (int i = 0; i < numFiles; i++) {
+      Path file = new Path(workDir, "test_" + i + ".txt");
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      Range range = ranges[i];
+      try {
+        for (int j = range.start; j < range.end; j++) {
+          writer.write(Integer.toString(j));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name,
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+
+  private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
+                                      InputSplit split,
+                                      JobConf job) throws IOException {
+    List<Text> result = new ArrayList<Text>();
+    RecordReader<LongWritable, Text> reader =
+      format.getRecordReader(split, job, voidReporter);
+    LongWritable key = reader.createKey();
+    Text value = reader.createValue();
+    while (reader.next(key, value)) {
+      result.add(value);
+      value = reader.createValue();
+    }
+    reader.close();
+    return result;
+  }
+
+  /**
+   * Test using the gzip codec for reading
+   */
+  @Test(timeout=10000)
+  public void testGzip() throws IOException {
+    JobConf job = new JobConf(defaultConf);
+    CompressionCodec gzip = new GzipCodec();
+    ReflectionUtils.setConf(gzip, job);
+    localFs.delete(workDir, true);
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+              "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+              "this is a test\nof gzip\n");
+    FileInputFormat.setInputPaths(job, workDir);
+    CombineTextInputFormat format = new CombineTextInputFormat();
+    InputSplit[] splits = format.getSplits(job, 100);
+    assertEquals("compressed splits == 1", 1, splits.length);
+    List<Text> results = readSplit(format, splits[0], job);
+    assertEquals("splits[0] length", 8, results.size());
+
+    final String[] firstList =
+      {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
+    final String[] secondList = {"this is a test", "of gzip"};
+    String first = results.get(0).toString();
+    if (first.equals(firstList[0])) {
+      testResults(results, firstList, secondList);
+    } else if (first.equals(secondList[0])) {
+      testResults(results, secondList, firstList);
+    } else {
+      fail("unexpected first token!");
+    }
+  }
+
+  private static void testResults(List<Text> results, String[] first,
+    String[] second) {
+    for (int i = 0; i < first.length; i++) {
+      assertEquals("splits[0]["+i+"]", first[i], results.get(i).toString());
+    }
+    for (int i = 0; i < second.length; i++) {
+      int j = i + first.length;
+      assertEquals("splits[0]["+j+"]", second[i], results.get(j).toString());
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineSequenceFileInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.junit.Test;
+
+public class TestCombineSequenceFileInputFormat {
+  private static final Log LOG =
+    LogFactory.getLog(TestCombineSequenceFileInputFormat.class);
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs = null;
+
+  static {
+    try {
+      conf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestCombineSequenceFileInputFormat");
+
+  @Test(timeout=10000)
+  public void testFormat() throws IOException, InterruptedException {
+    Job job = Job.getInstance(conf);
+
+    Random random = new Random();
+    long seed = random.nextLong();
+    random.setSeed(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+
+    final int length = 10000;
+    final int numFiles = 10;
+
+    // create files with a variety of lengths
+    createFiles(length, numFiles, random, job);
+
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(job.getConfiguration());
+    // create a combine split for the files
+    InputFormat<IntWritable,BytesWritable> format =
+      new CombineSequenceFileInputFormat<IntWritable,BytesWritable>();
+    for (int i = 0; i < 3; i++) {
+      int numSplits =
+        random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20)) + 1;
+      LOG.info("splitting: requesting = " + numSplits);
+      List<InputSplit> splits = format.getSplits(job);
+      LOG.info("splitting: got =        " + splits.size());
+
+      // we should have a single split as the length is comfortably smaller than
+      // the block size
+      assertEquals("We got more than one splits!", 1, splits.size());
+      InputSplit split = splits.get(0);
+      assertEquals("It should be CombineFileSplit",
+        CombineFileSplit.class, split.getClass());
+
+      // check the split
+      BitSet bits = new BitSet(length);
+      RecordReader<IntWritable,BytesWritable> reader =
+        format.createRecordReader(split, context);
+      MapContext<IntWritable,BytesWritable,IntWritable,BytesWritable> mcontext =
+        new MapContextImpl<IntWritable,BytesWritable,IntWritable,BytesWritable>(job.getConfiguration(),
+        context.getTaskAttemptID(), reader, null, null,
+        MapReduceTestUtil.createDummyReporter(), split);
+      reader.initialize(split, mcontext);
+      assertEquals("reader class is CombineFileRecordReader.",
+        CombineFileRecordReader.class, reader.getClass());
+
+      try {
+        while (reader.nextKeyValue()) {
+          IntWritable key = reader.getCurrentKey();
+          BytesWritable value = reader.getCurrentValue();
+          assertNotNull("Value should not be null.", value);
+          final int k = key.get();
+          LOG.debug("read " + k);
+          assertFalse("Key in multiple partitions.", bits.get(k));
+          bits.set(k);
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
+    }
+  }
+
+
+  private static class Range {
+    private final int start;
+    private final int end;
+
+    Range(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public String toString() {
+      return "(" + start + ", " + end + ")";
+    }
+  }
+
+  private static Range[] createRanges(int length, int numFiles, Random random) {
+    // generate a number of files with various lengths
+    Range[] ranges = new Range[numFiles];
+    for (int i = 0; i < numFiles; i++) {
+      int start = i == 0 ? 0 : ranges[i-1].end;
+      int end = i == numFiles - 1 ?
+        length :
+        (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1;
+      ranges[i] = new Range(start, end);
+    }
+    return ranges;
+  }
+
+  private static void createFiles(int length, int numFiles, Random random,
+    Job job) throws IOException {
+    Range[] ranges = createRanges(length, numFiles, random);
+
+    for (int i = 0; i < numFiles; i++) {
+      Path file = new Path(workDir, "test_" + i + ".seq");
+      // create a file with length entries
+      @SuppressWarnings("deprecation")
+      SequenceFile.Writer writer =
+        SequenceFile.createWriter(localFs, job.getConfiguration(), file,
+                                  IntWritable.class, BytesWritable.class);
+      Range range = ranges[i];
+      try {
+        for (int j = range.start; j < range.end; j++) {
+          IntWritable key = new IntWritable(j);
+          byte[] data = new byte[random.nextInt(10)];
+          random.nextBytes(data);
+          BytesWritable value = new BytesWritable(data);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java?rev=1471424&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java Wed Apr 24 14:11:50 2013
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.fail;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+public class TestCombineTextInputFormat {
+  private static final Log LOG =
+    LogFactory.getLog(TestCombineTextInputFormat.class);
+
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null;
+
+  static {
+    try {
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestCombineTextInputFormat");
+
+  @Test(timeout=10000)
+  public void testFormat() throws Exception {
+    Job job = Job.getInstance(new Configuration(defaultConf));
+
+    Random random = new Random();
+    long seed = random.nextLong();
+    LOG.info("seed = " + seed);
+    random.setSeed(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+
+    final int length = 10000;
+    final int numFiles = 10;
+
+    // create files with various lengths
+    createFiles(length, numFiles, random);
+
+    // create a combined split for the files
+    CombineTextInputFormat format = new CombineTextInputFormat();
+    for (int i = 0; i < 3; i++) {
+      int numSplits = random.nextInt(length/20) + 1;
+      LOG.info("splitting: requesting = " + numSplits);
+      List<InputSplit> splits = format.getSplits(job);
+      LOG.info("splitting: got =        " + splits.size());
+
+      // we should have a single split as the length is comfortably smaller than
+      // the block size
+      assertEquals("We got more than one splits!", 1, splits.size());
+      InputSplit split = splits.get(0);
+      assertEquals("It should be CombineFileSplit",
+        CombineFileSplit.class, split.getClass());
+
+      // check the split
+      BitSet bits = new BitSet(length);
+      LOG.debug("split= " + split);
+      TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+      RecordReader<LongWritable, Text> reader =
+        format.createRecordReader(split, context);
+      assertEquals("reader class is CombineFileRecordReader.",
+        CombineFileRecordReader.class, reader.getClass());
+      MapContext<LongWritable,Text,LongWritable,Text> mcontext =
+        new MapContextImpl<LongWritable,Text,LongWritable,Text>(job.getConfiguration(),
+        context.getTaskAttemptID(), reader, null, null,
+        MapReduceTestUtil.createDummyReporter(), split);
+      reader.initialize(split, mcontext);
+
+      try {
+        int count = 0;
+        while (reader.nextKeyValue()) {
+          LongWritable key = reader.getCurrentKey();
+          assertNotNull("Key should not be null.", key);
+          Text value = reader.getCurrentValue();
+          final int v = Integer.parseInt(value.toString());
+          LOG.debug("read " + v);
+          assertFalse("Key in multiple partitions.", bits.get(v));
+          bits.set(v);
+          count++;
+        }
+        LOG.debug("split=" + split + " count=" + count);
+      } finally {
+        reader.close();
+      }
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
+    }
+  }
+
+  private static class Range {
+    private final int start;
+    private final int end;
+
+    Range(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public String toString() {
+      return "(" + start + ", " + end + ")";
+    }
+  }
+
+  private static Range[] createRanges(int length, int numFiles, Random random) {
+    // generate a number of files with various lengths
+    Range[] ranges = new Range[numFiles];
+    for (int i = 0; i < numFiles; i++) {
+      int start = i == 0 ? 0 : ranges[i-1].end;
+      int end = i == numFiles - 1 ?
+        length :
+        (length/numFiles)*(2*i + 1)/2 + random.nextInt(length/numFiles) + 1;
+      ranges[i] = new Range(start, end);
+    }
+    return ranges;
+  }
+
+  private static void createFiles(int length, int numFiles, Random random)
+    throws IOException {
+    Range[] ranges = createRanges(length, numFiles, random);
+
+    for (int i = 0; i < numFiles; i++) {
+      Path file = new Path(workDir, "test_" + i + ".txt");
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      Range range = ranges[i];
+      try {
+        for (int j = range.start; j < range.end; j++) {
+          writer.write(Integer.toString(j));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name,
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+
+  private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
+    InputSplit split, Job job) throws IOException, InterruptedException {
+    List<Text> result = new ArrayList<Text>();
+    Configuration conf = job.getConfiguration();
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(conf);
+    RecordReader<LongWritable, Text> reader = format.createRecordReader(split,
+      MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+    MapContext<LongWritable,Text,LongWritable,Text> mcontext =
+      new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf,
+      context.getTaskAttemptID(), reader, null, null,
+      MapReduceTestUtil.createDummyReporter(),
+      split);
+    reader.initialize(split, mcontext);
+    while (reader.nextKeyValue()) {
+      result.add(new Text(reader.getCurrentValue()));
+    }
+    return result;
+  }
+
+  /**
+   * Test using the gzip codec for reading
+   */
+  @Test(timeout=10000)
+  public void testGzip() throws IOException, InterruptedException {
+    Configuration conf = new Configuration(defaultConf);
+    CompressionCodec gzip = new GzipCodec();
+    ReflectionUtils.setConf(gzip, conf);
+    localFs.delete(workDir, true);
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+              "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+              "this is a test\nof gzip\n");
+    Job job = Job.getInstance(conf);
+    FileInputFormat.setInputPaths(job, workDir);
+    CombineTextInputFormat format = new CombineTextInputFormat();
+    List<InputSplit> splits = format.getSplits(job);
+    assertEquals("compressed splits == 1", 1, splits.size());
+    List<Text> results = readSplit(format, splits.get(0), job);
+    assertEquals("splits[0] length", 8, results.size());
+
+    final String[] firstList =
+      {"the quick", "brown", "fox jumped", "over", " the lazy", " dog"};
+    final String[] secondList = {"this is a test", "of gzip"};
+    String first = results.get(0).toString();
+    if (first.equals(firstList[0])) {
+      testResults(results, firstList, secondList);
+    } else if (first.equals(secondList[0])) {
+      testResults(results, secondList, firstList);
+    } else {
+      fail("unexpected first token!");
+    }
+  }
+
+  private static void testResults(List<Text> results, String[] first,
+    String[] second) {
+    for (int i = 0; i < first.length; i++) {
+      assertEquals("splits[0]["+i+"]", first[i], results.get(i).toString());
+    }
+    for (int i = 0; i < second.length; i++) {
+      int j = i + first.length;
+      assertEquals("splits[0]["+j+"]", second[i], results.get(j).toString());
+    }
+  }
+}