You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/07/10 10:53:51 UTC
svn commit: r792839 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/lib/input/
src/test/mapred/org/apache/hadoop/mapreduce/
src/test/mapred/org/apache/hadoop/mapreduce/lib/input/
Author: sharad
Date: Fri Jul 10 08:53:51 2009
New Revision: 792839
URL: http://svn.apache.org/viewvc?rev=792839&view=rev
Log:
MAPREDUCE-655. Change KeyValueLineRecordReader and KeyValueTextInputFormat to use new mapreduce api. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 10 08:53:51 2009
@@ -90,6 +90,9 @@
MAPREDUCE-623. Resolve javac warnings in mapreduce. (Jothi Padmanabhan
via sharad)
+ MAPREDUCE-655. Change KeyValueLineRecordReader and KeyValueTextInputFormat
+ to use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
BUG FIXES
MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
(Aaron Kimball via matei)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Fri Jul 10 08:53:51 2009
@@ -29,7 +29,12 @@
* separator character. The separator can be specified in config file
* under the attribute name key.value.separator.in.input.line. The default
* separator is the tab character ('\t').
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader}
+ * instead
*/
+@Deprecated
public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
private final LineRecordReader lineRecordReader;
@@ -60,20 +65,15 @@
this.separator = (byte) sepStr.charAt(0);
}
- public static int findSeparator(byte[] utf, int start, int length, byte sep) {
- for (int i = start; i < (start + length); i++) {
- if (utf[i] == sep) {
- return i;
- }
- }
- return -1;
+ public static int findSeparator(byte[] utf, int start, int length,
+ byte sep) {
+ return org.apache.hadoop.mapreduce.lib.input.
+ KeyValueLineRecordReader.findSeparator(utf, start, length, sep);
}
/** Read key/value pair in a line. */
public synchronized boolean next(Text key, Text value)
throws IOException {
- Text tKey = key;
- Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.next(dummyKey, innerValue)) {
@@ -85,19 +85,8 @@
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
- if (pos == -1) {
- tKey.set(line, 0, lineLen);
- tValue.set("");
- } else {
- int keyLen = pos;
- byte[] keyBytes = new byte[keyLen];
- System.arraycopy(line, 0, keyBytes, 0, keyLen);
- int valLen = lineLen - keyLen - 1;
- byte[] valBytes = new byte[valLen];
- System.arraycopy(line, pos + 1, valBytes, 0, valLen);
- tKey.set(keyBytes);
- tValue.set(valBytes);
- }
+ org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader.
+ setKeyValue(key, value, line, lineLen, pos);
return true;
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Fri Jul 10 08:53:51 2009
@@ -30,7 +30,12 @@
* Either linefeed or carriage-return are used to signal end of line. Each line
* is divided into key and value parts by a separator byte. If no such a byte
* exists, the key will be the entire line and value will be empty.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat}
+ * instead
*/
+@Deprecated
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>
implements JobConfigurable {
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=792839&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java Fri Jul 10 08:53:51 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a
+ * separator character. The separator can be specified in config file
+ * under the attribute name key.value.separator.in.input.line. The default
+ * separator is the tab character ('\t').
+ */
+public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
+
+ private final LineRecordReader lineRecordReader;
+
+ private byte separator = (byte) '\t';
+
+ private Text innerValue;
+
+ private Text key;
+
+ private Text value;
+
+ public Class<?> getKeyClass() { return Text.class; }
+
+ public KeyValueLineRecordReader(Configuration conf)
+ throws IOException {
+
+ lineRecordReader = new LineRecordReader();
+ String sepStr = conf.get("key.value.separator.in.input.line", "\t");
+ this.separator = (byte) sepStr.charAt(0);
+ }
+
+ public void initialize(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+ lineRecordReader.initialize(genericSplit, context);
+ }
+
+ public static int findSeparator(byte[] utf, int start, int length,
+ byte sep) {
+ for (int i = start; i < (start + length); i++) {
+ if (utf[i] == sep) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public static void setKeyValue(Text key, Text value, byte[] line,
+ int lineLen, int pos) {
+ if (pos == -1) {
+ key.set(line, 0, lineLen);
+ value.set("");
+ } else {
+ int keyLen = pos;
+ byte[] keyBytes = new byte[keyLen];
+ System.arraycopy(line, 0, keyBytes, 0, keyLen);
+ int valLen = lineLen - keyLen - 1;
+ byte[] valBytes = new byte[valLen];
+ System.arraycopy(line, pos + 1, valBytes, 0, valLen);
+ key.set(keyBytes);
+ value.set(valBytes);
+ }
+ }
+ /** Read key/value pair in a line. */
+ public synchronized boolean nextKeyValue()
+ throws IOException {
+ byte[] line = null;
+ int lineLen = -1;
+ if (lineRecordReader.nextKeyValue()) {
+ innerValue = lineRecordReader.getCurrentValue();
+ line = innerValue.getBytes();
+ lineLen = innerValue.getLength();
+ } else {
+ return false;
+ }
+ if (line == null)
+ return false;
+ if (key == null) {
+ key = new Text();
+ }
+ if (value == null) {
+ value = new Text();
+ }
+ int pos = findSeparator(line, 0, lineLen, this.separator);
+ setKeyValue(key, value, line, lineLen, pos);
+ return true;
+ }
+
+ public Text getCurrentKey() {
+ return key;
+ }
+
+ public Text getCurrentValue() {
+ return value;
+ }
+
+ public float getProgress() {
+ return lineRecordReader.getProgress();
+ }
+
+ public synchronized void close() throws IOException {
+ lineRecordReader.close();
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=792839&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Fri Jul 10 08:53:51 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either line feed or carriage-return are used to signal end of line.
+ * Each line is divided into key and value parts by a separator byte. If no
+ * such a byte exists, the key will be the entire line and value will be empty.
+ */
+public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ CompressionCodec codec =
+ new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+ return codec == null;
+ }
+
+ public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+
+ context.setStatus(genericSplit.toString());
+ return new KeyValueLineRecordReader(context.getConfiguration());
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=792839&r1=792838&r2=792839&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Jul 10 08:53:51 2009
@@ -258,4 +258,26 @@
job.setNumReduceTasks(numReds);
return job;
}
+
+ public static TaskAttemptContext createDummyMapTaskAttemptContext(
+ Configuration conf) {
+ TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
+ conf.set("mapred.task.id", tid.toString());
+ return new TaskAttemptContext(conf, tid);
+ }
+
+ public static StatusReporter createDummyReporter() {
+ return new StatusReporter() {
+ public void setStatus(String s) {
+ }
+ public void progress() {
+ }
+ public Counter getCounter(Enum<?> name) {
+ return new Counters().findCounter(name);
+ }
+ public Counter getCounter(String group, String name) {
+ return new Counters().findCounter(group, name);
+ }
+ };
+ }
}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java?rev=792839&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java Fri Jul 10 08:53:51 2009
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class TestMRKeyValueTextInputFormat extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestMRKeyValueTextInputFormat.class.getName());
+
+ private static int MAX_LENGTH = 10000;
+
+ private static Configuration defaultConf = new Configuration();
+ private static FileSystem localFs = null;
+ static {
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestKeyValueTextInputFormat");
+
+ public void testFormat() throws Exception {
+ Job job = new Job(new Configuration());
+ Path file = new Path(workDir, "test.txt");
+
+ int seed = new Random().nextInt();
+ LOG.info("seed = " + seed);
+ Random random = new Random(seed);
+
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(job, workDir);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+
+ LOG.debug("creating; entries = " + length);
+
+ // create a file with length entries
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i * 2));
+ writer.write("\t");
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+ for (int i = 0; i < 3; i++) {
+ int numSplits = random.nextInt(MAX_LENGTH / 20) + 1;
+ LOG.debug("splitting: requesting = " + numSplits);
+ List<InputSplit> splits = format.getSplits(job);
+ LOG.debug("splitting: got = " + splits.size());
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.size(); j++) {
+ LOG.debug("split["+j+"]= " + splits.get(j));
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader<Text, Text> reader = format.createRecordReader(
+ splits.get(j), context);
+ Class<?> clazz = reader.getClass();
+ assertEquals("reader class is KeyValueLineRecordReader.",
+ KeyValueLineRecordReader.class, clazz);
+ MapContext<Text, Text, Text, Text> mcontext =
+ new MapContext<Text, Text, Text, Text>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(), splits.get(j));
+ reader.initialize(splits.get(j), mcontext);
+
+ Text key = null;
+ Text value = null;
+ try {
+ int count = 0;
+ while (reader.nextKeyValue()) {
+ key = reader.getCurrentKey();
+ clazz = key.getClass();
+ assertEquals("Key class is Text.", Text.class, clazz);
+ value = reader.getCurrentValue();
+ clazz = value.getClass();
+ assertEquals("Value class is Text.", Text.class, clazz);
+ int v = Integer.parseInt(value.toString());
+ LOG.debug("read " + v);
+ assertFalse("Key in multiple partitions.", bits.get(v));
+ bits.set(v);
+ count++;
+ }
+ LOG.debug("splits[" + j + "]=" + splits.get(j) +" count=" + count);
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+ }
+ private LineReader makeStream(String str) throws IOException {
+ return new LineReader(new ByteArrayInputStream
+ (str.getBytes("UTF-8")),
+ defaultConf);
+ }
+
+ public void testUTF8() throws Exception {
+ LineReader in = makeStream("abcd\u20acbdcd\u20ac");
+ Text line = new Text();
+ in.readLine(line);
+ assertEquals("readLine changed utf8 characters",
+ "abcd\u20acbdcd\u20ac", line.toString());
+ in = makeStream("abc\u200axyz");
+ in.readLine(line);
+ assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+ }
+
+ public void testNewLines() throws Exception {
+ LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+ Text out = new Text();
+ in.readLine(out);
+ assertEquals("line1 length", 1, out.getLength());
+ in.readLine(out);
+ assertEquals("line2 length", 2, out.getLength());
+ in.readLine(out);
+ assertEquals("line3 length", 0, out.getLength());
+ in.readLine(out);
+ assertEquals("line4 length", 3, out.getLength());
+ in.readLine(out);
+ assertEquals("line5 length", 4, out.getLength());
+ in.readLine(out);
+ assertEquals("line5 length", 5, out.getLength());
+ assertEquals("end of file", 0, in.readLine(out));
+ }
+
+ private static void writeFile(FileSystem fs, Path name,
+ CompressionCodec codec,
+ String contents) throws IOException {
+ OutputStream stm;
+ if (codec == null) {
+ stm = fs.create(name);
+ } else {
+ stm = codec.createOutputStream(fs.create(name));
+ }
+ stm.write(contents.getBytes());
+ stm.close();
+ }
+
+ private static List<Text> readSplit(KeyValueTextInputFormat format,
+ InputSplit split, Job job) throws IOException, InterruptedException {
+ List<Text> result = new ArrayList<Text>();
+ Configuration conf = job.getConfiguration();
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(conf);
+ RecordReader<Text, Text> reader = format.createRecordReader(split,
+ MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+ MapContext<Text, Text, Text, Text> mcontext =
+ new MapContext<Text, Text, Text, Text>(conf,
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ while (reader.nextKeyValue()) {
+ result.add(new Text(reader.getCurrentValue()));
+ }
+ return result;
+ }
+
+ /**
+ * Test using the gzip codec for reading
+ */
+ public static void testGzip() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ CompressionCodec gzip = new GzipCodec();
+ ReflectionUtils.setConf(gzip, conf);
+ localFs.delete(workDir, true);
+ writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+ "line-1\tthe quick\nline-2\tbrown\nline-3\t" +
+ "fox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n");
+ writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+ "line-1\tthis is a test\nline-1\tof gzip\n");
+ Job job = new Job(conf);
+ FileInputFormat.setInputPaths(job, workDir);
+ KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+ List<InputSplit> splits = format.getSplits(job);
+ assertEquals("compressed splits == 2", 2, splits.size());
+ FileSplit tmp = (FileSplit) splits.get(0);
+ if (tmp.getPath().getName().equals("part2.txt.gz")) {
+ splits.set(0, splits.get(1));
+ splits.set(1, tmp);
+ }
+ List<Text> results = readSplit(format, splits.get(0), job);
+ assertEquals("splits[0] length", 6, results.size());
+ assertEquals("splits[0][0]", "the quick", results.get(0).toString());
+ assertEquals("splits[0][1]", "brown", results.get(1).toString());
+ assertEquals("splits[0][2]", "fox jumped", results.get(2).toString());
+ assertEquals("splits[0][3]", "over", results.get(3).toString());
+ assertEquals("splits[0][4]", " the lazy", results.get(4).toString());
+ assertEquals("splits[0][5]", " dog", results.get(5).toString());
+ results = readSplit(format, splits.get(1), job);
+ assertEquals("splits[1] length", 2, results.size());
+ assertEquals("splits[1][0]", "this is a test",
+ results.get(0).toString());
+ assertEquals("splits[1][1]", "of gzip",
+ results.get(1).toString());
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestMRKeyValueTextInputFormat().testFormat();
+ }
+}