You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jo...@apache.org on 2008/11/24 11:51:29 UTC
svn commit: r720162 - in /hadoop/core/trunk: CHANGES.txt
src/core/org/apache/hadoop/io/compress/LzopCodec.java
src/mapred/org/apache/hadoop/mapred/LzoTextInputFormat.java
src/test/org/apache/hadoop/mapred/TestLzoTextInputFormat.java
Author: johan
Date: Mon Nov 24 02:51:28 2008
New Revision: 720162
URL: http://svn.apache.org/viewvc?rev=720162&view=rev
Log:
HADOOP-4640. Adds an input format that can split lzo compressed text files. (johan)
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LzoTextInputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLzoTextInputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/LzopCodec.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720162&r1=720161&r2=720162&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Nov 24 02:51:28 2008
@@ -123,6 +123,9 @@
it down by monitoring for cumulative memory usage across tasks.
(Vinod Kumar Vavilapalli via yhemanth)
+ HADOOP-4640. Adds an input format that can split lzo compressed
+ text files. (johan)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/LzopCodec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/LzopCodec.java?rev=720162&r1=720161&r2=720162&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/LzopCodec.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/LzopCodec.java Mon Nov 24 02:51:28 2008
@@ -408,18 +408,21 @@
}
public void close() throws IOException {
+ byte[] b = new byte[4096];
+ while (!decompressor.finished()) {
+ decompressor.decompress(b, 0, b.length);
+ }
super.close();
verifyChecksums();
}
}
- protected static class LzopDecompressor extends LzoDecompressor {
+ public static class LzopDecompressor extends LzoDecompressor {
private EnumMap<DChecksum,Checksum> chkDMap =
new EnumMap<DChecksum,Checksum>(DChecksum.class);
private EnumMap<CChecksum,Checksum> chkCMap =
new EnumMap<CChecksum,Checksum>(CChecksum.class);
- private final int bufferSize;
/**
* Create an LzoDecompressor with LZO1X strategy (the only lzo algorithm
@@ -427,10 +430,18 @@
*/
public LzopDecompressor(int bufferSize) {
super(LzoDecompressor.CompressionStrategy.LZO1X_SAFE, bufferSize);
- this.bufferSize = bufferSize;
}
/**
+ * Get the number of checksum implementations
+ * the current lzo file uses.
+ * @return Number of checksum implementations in use.
+ */
+ public int getChecksumsCount() {
+ return this.chkCMap.size() + this.chkDMap.size();
+ }
+
+ /**
* Given a set of decompressed and compressed checksums,
*/
public void initHeaderFlags(EnumSet<DChecksum> dflags,
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LzoTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LzoTextInputFormat.java?rev=720162&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LzoTextInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LzoTextInputFormat.java Mon Nov 24 02:51:28 2008
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.LzopCodec;
+import org.apache.hadoop.io.compress.LzopCodec.LzopDecompressor;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * An {@link InputFormat} for lzop compressed text files. Files are broken into
+ * lines. Either linefeed or carriage-return are used to signal end of line.
+ * Keys are the position in the file, and values are the line of text.
+ */
+public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text>
+ implements JobConfigurable {
+
+ private static final Log LOG
+ = LogFactory.getLog(LzoTextInputFormat.class.getName());
+
+ public static final String LZO_INDEX_SUFFIX = ".index";
+
+ public void configure(JobConf conf) {
+ FileInputFormat.setInputPathFilter(conf, LzopFilter.class);
+ }
+
+ /**
+ * We don't want to process the index files.
+ */
+ static class LzopFilter implements PathFilter {
+ public boolean accept(Path path) {
+ if (path.toString().endsWith(LZO_INDEX_SUFFIX)) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ Path indexFile = new Path(file.toString()
+ + LzoTextInputFormat.LZO_INDEX_SUFFIX);
+
+ try {
+ // can't split without the index
+ return fs.exists(indexFile);
+ } catch (IOException e) {
+ LOG.warn("Could not check if index file exists", e);
+ return false;
+ }
+ }
+
+ public RecordReader<LongWritable, Text> getRecordReader(
+ InputSplit genericSplit, JobConf job, Reporter reporter)
+ throws IOException {
+
+ reporter.setStatus(genericSplit.toString());
+ return new LzoLineRecordReader(job, (FileSplit) genericSplit);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ FileSplit[] splits = (FileSplit[]) super.getSplits(job, numSplits);
+ // find new start/ends of the filesplit that aligns
+ // with the lzo blocks
+
+ List<FileSplit> result = new ArrayList<FileSplit>();
+ FileSystem fs = FileSystem.get(job);
+
+ Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
+ for (int i = 0; i < splits.length; i++) {
+ FileSplit fileSplit = splits[i];
+ // load the index
+ Path file = fileSplit.getPath();
+ if (!indexes.containsKey(file)) {
+ LzoIndex index = readIndex(file, fs);
+ if (index.isEmpty()) {
+ // keep it as is since we didn't find an index
+ result.add(fileSplit);
+ continue;
+ }
+
+ indexes.put(file, index);
+ }
+
+ LzoIndex index = indexes.get(file);
+ long start = fileSplit.getStart();
+ long end = start + fileSplit.getLength();
+
+ if (start != 0) {
+ // find the next block position from
+ // the start of the split
+ long newStart = index.findNextPosition(start);
+ if (newStart == -1 || newStart >= end) {
+ // just skip this since it will be handled by another split
+ continue;
+ }
+ start = newStart;
+ }
+
+ long newEnd = index.findNextPosition(end);
+ if (newEnd != -1) {
+ end = newEnd;
+ }
+
+ result.add(new FileSplit(file, start, end - start, fileSplit
+ .getLocations()));
+ }
+
+ return result.toArray(new FileSplit[] {});
+ }
+
+ /**
+ * Read the index of the lzo file.
+ *
+ * @param split Read the index of this file.
+ * @param fs The index file is on this file system.
+ * @throws IOException
+ */
+ private LzoIndex readIndex(Path file, FileSystem fs) throws IOException {
+ FSDataInputStream indexIn = null;
+ try {
+ Path indexFile = new Path(file.toString() + LZO_INDEX_SUFFIX);
+ if (!fs.exists(indexFile)) {
+ // return empty index, fall back to the unsplittable mode
+ return new LzoIndex();
+ }
+
+ long indexLen = fs.getFileStatus(indexFile).getLen();
+ int blocks = (int) (indexLen / 8);
+ LzoIndex index = new LzoIndex(blocks);
+ indexIn = fs.open(indexFile);
+ for (int i = 0; i < blocks; i++) {
+ index.set(i, indexIn.readLong());
+ }
+ return index;
+ } finally {
+ if (indexIn != null) {
+ indexIn.close();
+ }
+ }
+ }
+
+ /**
+ * Index an lzo file to allow the input format to split them into separate map
+ * jobs.
+ *
+ * @param fs File system that contains the file.
+ * @param lzoFile the lzo file to index.
+ * @throws IOException
+ */
+ public static void createIndex(FileSystem fs, Path lzoFile)
+ throws IOException {
+
+ Configuration conf = fs.getConf();
+ LzopCodec codec = new LzopCodec();
+ codec.setConf(conf);
+
+ FSDataInputStream is = null;
+ FSDataOutputStream os = null;
+ try {
+ is = fs.open(lzoFile);
+ os = fs.create(new Path(lzoFile.toString()
+ + LzoTextInputFormat.LZO_INDEX_SUFFIX));
+ LzopDecompressor decompressor = (LzopDecompressor) codec
+ .createDecompressor();
+ // for reading the header
+ codec.createInputStream(is, decompressor);
+
+ int numChecksums = decompressor.getChecksumsCount();
+
+ while (true) {
+ //read and ignore, we just want to get to the next int
+ int uncompressedBlockSize = is.readInt();
+ if (uncompressedBlockSize == 0) {
+ break;
+ } else if (uncompressedBlockSize < 0) {
+ throw new EOFException();
+ }
+
+ int compressedBlockSize = is.readInt();
+ if (compressedBlockSize <= 0) {
+ throw new IOException("Could not read compressed block size");
+ }
+
+ long pos = is.getPos();
+ // write the pos of the block start
+ os.writeLong(pos - 8);
+ // seek to the start of the next block, skip any checksums
+ is.seek(pos + compressedBlockSize + (4 * numChecksums));
+ }
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+
+ if (os != null) {
+ os.close();
+ }
+ }
+ }
+
+ /**
+ * Represents the lzo index.
+ */
+ static class LzoIndex {
+
+ private long[] blockPositions;
+
+ LzoIndex() {
+ }
+
+ LzoIndex(int blocks) {
+ blockPositions = new long[blocks];
+ }
+
+ /**
+ * Set the position for the block.
+ * @param blockNumber Block to set pos for.
+ * @param pos Position.
+ */
+ public void set(int blockNumber, long pos) {
+ blockPositions[blockNumber] = pos;
+ }
+
+ /**
+ * Find the next lzo block start from the given position.
+ * @param pos The position to start looking from.
+ * @return Either the start position of the block or -1 if
+ * it couldn't be found.
+ */
+ public long findNextPosition(long pos) {
+ int block = Arrays.binarySearch(blockPositions, pos);
+
+ if(block >= 0) {
+ //direct hit on a block start position
+ return blockPositions[block];
+ } else {
+ block = Math.abs(block) - 1;
+ if(block > blockPositions.length - 1) {
+ return -1;
+ }
+ return blockPositions[block];
+ }
+ }
+
+ public boolean isEmpty() {
+ return blockPositions == null || blockPositions.length == 0;
+ }
+
+ }
+
+ /**
+ * Reads line from an lzo compressed text file. Treats keys as offset in file
+ * and value as line.
+ */
+ static class LzoLineRecordReader implements RecordReader<LongWritable, Text> {
+
+ private CompressionCodecFactory compressionCodecs = null;
+ private long start;
+ private long pos;
+ private long end;
+ private LineReader in;
+ private FSDataInputStream fileIn;
+
+ public LzoLineRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+
+ FileSystem fs = file.getFileSystem(job);
+
+ compressionCodecs = new CompressionCodecFactory(job);
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (codec == null) {
+ throw new IOException("No lzo codec found, cannot run");
+ }
+
+ // open the file and seek to the start of the split
+ fileIn = fs.open(split.getPath());
+
+ // creates input stream and also reads the file header
+ in = new LineReader(codec.createInputStream(fileIn), job);
+
+ if (start != 0) {
+ fileIn.seek(start);
+
+ // read and ignore the first line
+ in.readLine(new Text());
+ start = fileIn.getPos();
+ }
+
+ this.pos = start;
+ }
+
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ public Text createValue() {
+ return new Text();
+ }
+
+ /** Read a line. */
+ public synchronized boolean next(LongWritable key, Text value)
+ throws IOException {
+
+ //since the lzop codec reads everything in lzo blocks
+ //we can't stop if the pos == end
+ //instead we wait for the next block to be read in when
+ //pos will be > end
+ while (pos <= end) {
+ key.set(pos);
+
+ int newSize = in.readLine(value);
+ if (newSize == 0) {
+ return false;
+ }
+ pos = fileIn.getPos();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Get the progress within the split.
+ */
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+ }
+
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+
+ public synchronized void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLzoTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLzoTextInputFormat.java?rev=720162&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLzoTextInputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestLzoTextInputFormat.java Mon Nov 24 02:51:28 2008
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.LzopCodec;
+import org.apache.hadoop.mapred.LzoTextInputFormat.LzoIndex;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * Test the LzoTextInputFormat, make sure it splits the file properly and
+ * returns the right data.
+ */
+public class TestLzoTextInputFormat extends TestCase {
+
+ private static final Log LOG
+ = LogFactory.getLog(TestLzoTextInputFormat.class.getName());
+
+ private MessageDigest md5;
+ private String lzoFileName = "file";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ md5 = MessageDigest.getInstance("MD5");
+ }
+
+ /**
+ * Make sure the lzo index class works as described.
+ */
+ public void testLzoIndex() {
+ LzoIndex index = new LzoIndex();
+ assertTrue(index.isEmpty());
+ index = new LzoIndex(4);
+ index.set(0, 0);
+ index.set(1, 5);
+ index.set(2, 10);
+ index.set(3, 15);
+ assertFalse(index.isEmpty());
+
+ assertEquals(0, index.findNextPosition(-1));
+ assertEquals(5, index.findNextPosition(1));
+ assertEquals(5, index.findNextPosition(5));
+ assertEquals(15, index.findNextPosition(11));
+ assertEquals(15, index.findNextPosition(15));
+ assertEquals(-1, index.findNextPosition(16));
+ }
+
+ /**
+ * Index the file and make sure it splits properly.
+ * @throws NoSuchAlgorithmException
+ * @throws IOException
+ */
+ public void testWithIndex() throws NoSuchAlgorithmException, IOException {
+ runTest(true);
+ }
+
+ /**
+ * Don't index the file and make sure it can be processed anyway.
+ * @throws NoSuchAlgorithmException
+ * @throws IOException
+ */
+ public void testWithoutIndex() throws NoSuchAlgorithmException, IOException {
+ runTest(false);
+ }
+
+ private void runTest(boolean testWithIndex) throws IOException,
+ NoSuchAlgorithmException {
+
+ if (!NativeCodeLoader.isNativeCodeLoaded()) {
+ LOG.warn("Cannot run this test without the native lzo libraries");
+ return;
+ }
+
+ String attempt = "attempt_200707121733_0001_m_000000_0";
+ Path workDir = new Path(new Path(new Path(System.getProperty(
+ "test.build.data", "."), "data"), FileOutputCommitter.TEMP_DIR_NAME),
+ "_" + attempt);
+ Path outputDir = workDir.getParent().getParent();
+
+ JobConf conf = new JobConf();
+ conf.set("mapred.task.id", attempt);
+ conf.set("io.compression.codecs", LzopCodec.class.getName());
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(workDir, true);
+ localFs.mkdirs(workDir);
+ FileInputFormat.setInputPaths(conf, outputDir);
+
+
+ // create some input data
+ byte[] expectedMd5 = createTestInput(outputDir, workDir, conf, localFs);
+
+ if(testWithIndex) {
+ Path lzoFile = new Path(workDir, lzoFileName);
+ LzoTextInputFormat.createIndex(localFs, new Path(lzoFile
+ + new LzopCodec().getDefaultExtension()));
+ }
+
+ LzoTextInputFormat inputFormat = new LzoTextInputFormat();
+ inputFormat.configure(conf);
+
+ //it's left in the work dir
+ FileInputFormat.setInputPaths(conf, workDir);
+
+ int numSplits = 3;
+ InputSplit[] is = inputFormat.getSplits(conf, numSplits);
+ if(testWithIndex) {
+ assertEquals(numSplits, is.length);
+ } else {
+ assertEquals(1, is.length);
+ }
+
+ for (InputSplit inputSplit : is) {
+ RecordReader<LongWritable, Text> rr = inputFormat.getRecordReader(
+ inputSplit, conf, Reporter.NULL);
+ LongWritable key = rr.createKey();
+ Text value = rr.createValue();
+
+ while (rr.next(key, value)) {
+ md5.update(value.getBytes(), 0, value.getLength());
+ }
+
+ rr.close();
+ }
+
+ assertTrue(Arrays.equals(expectedMd5, md5.digest()));
+ }
+
+ /**
+ * Creates an lzo file with random data.
+ *
+ * @param outputDir Output directory
+ * @param workDir Work directory, this is where the file is written to
+ * @param fs File system we're using
+ * @throws IOException
+ */
+ private byte[] createTestInput(Path outputDir, Path workDir, JobConf conf,
+ FileSystem fs) throws IOException {
+
+ RecordWriter<Text, Text> rw = null;
+
+ md5.reset();
+
+ try {
+ TextOutputFormat<Text, Text> output = new TextOutputFormat<Text, Text>();
+ TextOutputFormat.setCompressOutput(conf, true);
+ TextOutputFormat.setOutputCompressorClass(conf, LzopCodec.class);
+ TextOutputFormat.setOutputPath(conf, outputDir);
+ TextOutputFormat.setWorkOutputPath(conf, workDir);
+
+ rw = output.getRecordWriter(null, conf, lzoFileName, Reporter.NULL);
+
+ //has to be enough data to create a couple of lzo blocks
+ int charsToOutput = 10485760;
+ char[] chars = "abcdefghijklmnopqrstuvwxyz\u00E5\u00E4\u00F6"
+ .toCharArray();
+
+ Random r = new Random(System.currentTimeMillis());
+ Text key = new Text();
+ Text value = new Text();
+ int charsMax = chars.length - 1;
+ for (int i = 0; i < charsToOutput;) {
+ i += fillText(chars, r, charsMax, key);
+ i += fillText(chars, r, charsMax, value);
+ rw.write(key, value);
+ md5.update(key.getBytes(), 0, key.getLength());
+ //text output format writes tab between the key and value
+ md5.update("\t".getBytes("UTF-8"));
+ md5.update(value.getBytes(), 0, value.getLength());
+ }
+ } finally {
+ if (rw != null) {
+ rw.close(Reporter.NULL);
+ }
+ }
+
+ byte[] result = md5.digest();
+ md5.reset();
+ return result;
+ }
+
+ private int fillText(char[] chars, Random r, int charsMax, Text text) {
+ StringBuilder sb = new StringBuilder();
+ //get a reasonable string length
+ int stringLength = r.nextInt(charsMax * 2);
+ for (int j = 0; j < stringLength; j++) {
+ sb.append(chars[r.nextInt(charsMax)]);
+ }
+ text.set(sb.toString());
+ return stringLength;
+ }
+
+}