You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/08 23:19:43 UTC
svn commit: r441653 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/
src/test/org/apache/hadoop/io/compress/ src/test/org/apache...
Author: cutting
Date: Fri Sep 8 14:19:41 2006
New Revision: 441653
URL: http://svn.apache.org/viewvc?view=rev&rev=441653
Log:
HADOOP-474. Add CompressionCodecFactory and use it in TextInputFormat and TextOutputFormat. Also add gzip codec and fix some problems with UTF8 text inputs. Contributed by Owen.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Sep 8 14:19:41 2006
@@ -161,6 +161,13 @@
40. HADOOP-517. Fix a contrib/streaming bug in end-of-line detection.
(Hairong Kuang via cutting)
+41. HADOOP-474. Add CompressionCodecFactory, and use it in
+ TextInputFormat and TextOutputFormat. Compressed input files are
+ automatically decompressed when they have the correct extension.
+ Output files will, when output compression is specified, be
+ generated with an approprate extension. Also add a gzip codec and
+ fix problems with UTF8 text inputs. (omalley via cutting)
+
Release 0.5.0 - 2006-08-04
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri Sep 8 14:19:41 2006
@@ -87,6 +87,13 @@
facilitate opening large map files using less memory.</description>
</property>
+<property>
+ <name>io.compression.codecs</name>
+ <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec</value>
+ <description>A list of the compression codec classes that can be used
+ for compression/decompression.</description>
+</property>
+
<!-- file system properties -->
<property>
@@ -441,6 +448,21 @@
-->
<property>
+ <name>mapred.output.compress</name>
+ <value>false</value>
+ <description>Should the outputs of the reduces be compressed?
+ </description>
+</property>
+
+<property>
+ <name>mapred.output.compression.codec</name>
+ <value>org.apache.hadoop.io.compress.DefaultCodec</value>
+ <description>If the reduce outputs are compressed, how should they be
+ compressed?
+ </description>
+</property>
+
+<property>
<name>mapred.compress.map.output</name>
<value>false</value>
<description>Should the outputs of the maps be compressed before being
@@ -449,7 +471,7 @@
</property>
<property>
- <name>mapred.seqfile.compress.blocksize</name>
+ <name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
<description>The minimum block size for compression in block compressed
SequenceFiles.
@@ -457,7 +479,7 @@
</property>
<property>
- <name>mapred.seqfile.lazydecompress</name>
+ <name>io.seqfile.lazydecompress</name>
<value>true</value>
<description>Should values of block-compressed SequenceFiles be decompressed
only when necessary.
@@ -465,7 +487,7 @@
</property>
<property>
- <name>mapred.seqfile.sorter.recordlimit</name>
+ <name>io.seqfile.sorter.recordlimit</name>
<value>1000000</value>
<description>The limit on number of records to be kept in memory in a spill
in SequenceFiles.Sorter
@@ -473,8 +495,8 @@
</property>
<property>
- <name>mapred.seqfile.compression.type</name>
- <value>NONE</value>
+ <name>io.seqfile.compression.type</name>
+ <value>RECORD</value>
<description>The default compression type for SequenceFile.Writer.
</description>
</property>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Sep 8 14:19:41 2006
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -63,6 +64,27 @@
/** Compress sequences of records together in blocks. */
BLOCK
}
+
+ /**
+ * Get the compression type for the reduce outputs
+ * @param job the job config to look in
+ * @return the kind of compression to use
+ */
+ static public CompressionType getCompressionType(Configuration job) {
+ String name = job.get("io.seqfile.compression.type");
+ return name == null ? CompressionType.RECORD :
+ CompressionType.valueOf(name);
+ }
+
+ /**
+ * Set the compression type for sequence files.
+ * @param job the configuration to modify
+ * @param val the new compression type (none, block, record)
+ */
+ static public void setCompressionType(Configuration job,
+ CompressionType val) {
+ job.set("io.seqfile.compression.type", val.toString());
+ }
/**
* Construct the preferred type of SequenceFile Writer.
@@ -685,7 +707,7 @@
Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
super.init(name, fs.create(name), keyClass, valClass, true, codec);
- init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
+ init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
writeFileHeader();
@@ -699,7 +721,7 @@
throws IOException {
super.init(name, fs.create(name, progress), keyClass, valClass,
true, codec);
- init(conf.getInt("mapred.seqfile.compress.blocksize", 1000000));
+ init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
writeFileHeader();
@@ -998,7 +1020,7 @@
}
- lazyDecompress = conf.getBoolean("mapred.seqfile.lazydecompress", true);
+ lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true);
}
/** Close the file. */
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java Fri Sep 8 14:19:41 2006
@@ -173,18 +173,29 @@
* @exception CharacterCodingException if the array contains invalid UTF8 code
*/
public void set(byte[] utf8) throws CharacterCodingException {
- validateUTF8(utf8);
- set(utf8, utf8.length);
+ set(utf8, 0, utf8.length);
}
/** copy a text. */
public void set(Text other) {
- set(other.bytes, other.length);
+ try {
+ set(other.bytes, 0, other.length);
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("bad Text UTF8 encoding", e);
+ }
}
- private void set(byte[] utf8, int len ) {
+ /**
+ * Set the Text to range of bytes
+ * @param utf8 the data to copy from
+ * @param start the first position of the new string
+ * @param len the number of bytes of the new string
+ */
+ public void set(byte[] utf8, int start, int len
+ ) throws CharacterCodingException{
+ validateUTF8(utf8, start, len);
setCapacity(len);
- System.arraycopy(utf8, 0, bytes, 0, len);
+ System.arraycopy(utf8, start, bytes, 0, len);
this.length = len;
}
@@ -416,10 +427,17 @@
* @exception MalformedInputException if the byte array contains invalid utf-8
*/
public static void validateUTF8(byte[] utf8) throws MalformedInputException {
- validateUTF(utf8, 0, utf8.length);
+ validateUTF8(utf8, 0, utf8.length);
}
- public static void validateUTF(byte[] utf8, int start, int len)
+ /**
+ * Check to see if a byte array is valid utf-8
+ * @param utf8 the array of bytes
+ * @param start the offset of the first byte in the array
+ * @param len the length of the byte sequence
+ * @throws MalformedInputException if the byte array contains invalid bytes
+ */
+ public static void validateUTF8(byte[] utf8, int start, int len)
throws MalformedInputException {
int count = start;
int leadByte = 0;
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java?view=auto&rev=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodecFactory.java Fri Sep 8 14:19:41 2006
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A factory that will find the correct codec for a given filename.
+ * @author Owen O'Malley
+ */
+public class CompressionCodecFactory {
+
+ public static final Log LOG =
+ LogFactory.getLog(CompressionCodecFactory.class.getName());
+
+ /**
+ * A map from the reversed filename suffixes to the codecs.
+ * This is probably overkill, because the maps should be small, but it
+ * automatically supports finding the longest matching suffix.
+ */
+ private SortedMap<String, CompressionCodec> codecs = null;
+
+ private void addCodec(CompressionCodec codec) {
+ String suffix = codec.getDefaultExtension();
+ codecs.put(new StringBuffer(suffix).reverse().toString(), codec);
+ }
+
+ /**
+ * Print the extension map out as a string.
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ Iterator<Map.Entry<String, CompressionCodec>> itr =
+ codecs.entrySet().iterator();
+ buf.append("{ ");
+ if (itr.hasNext()) {
+ Map.Entry<String, CompressionCodec> entry = itr.next();
+ buf.append(entry.getKey());
+ buf.append(": ");
+ buf.append(entry.getValue().getClass().getName());
+ while (itr.hasNext()) {
+ entry = itr.next();
+ buf.append(", ");
+ buf.append(entry.getKey());
+ buf.append(": ");
+ buf.append(entry.getValue().getClass().getName());
+ }
+ }
+ buf.append(" }");
+ return buf.toString();
+ }
+
+ /**
+ * Get the list of codecs listed in the configuration
+ * @param conf the configuration to look in
+ * @return a list of the Configuration classes or null if the attribute
+ * was not set
+ */
+ public static List<Class> getCodecClasses(Configuration conf) {
+ String codecsString = conf.get("io.compression.codecs");
+ if (codecsString != null) {
+ List<Class> result = new ArrayList<Class>();
+ StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
+ while (codecSplit.hasMoreElements()) {
+ String codecSubstring = codecSplit.nextToken();
+ if (codecSubstring.length() != 0) {
+ try {
+ Class cls = conf.getClassByName(codecSubstring);
+ if (!CompressionCodec.class.isAssignableFrom(cls)) {
+ throw new IllegalArgumentException("Class " + codecSubstring +
+ " is not a CompressionCodec");
+ }
+ result.add(cls);
+ } catch (ClassNotFoundException ex) {
+ throw new IllegalArgumentException("Compression codec " +
+ codecSubstring + " not found.",
+ ex);
+ }
+ }
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Sets a list of codec classes in the configuration.
+ * @param conf the configuration to modify
+ * @param classes the list of classes to set
+ */
+ public static void setCodecClasses(Configuration conf,
+ List<Class> classes) {
+ StringBuffer buf = new StringBuffer();
+ Iterator<Class> itr = classes.iterator();
+ if (itr.hasNext()) {
+ Class cls = itr.next();
+ buf.append(cls.getName());
+ while(itr.hasNext()) {
+ buf.append(',');
+ buf.append(itr.next().getName());
+ }
+ }
+ conf.set("io.compression.codecs",buf.toString());
+ }
+
+ /**
+ * Find the codecs specified in the config value io.compression.codecs
+ * and register them. Defaults to gzip and zip.
+ */
+ public CompressionCodecFactory(Configuration conf) {
+ codecs = new TreeMap<String, CompressionCodec>();
+ List<Class> codecClasses = getCodecClasses(conf);
+ if (codecClasses == null) {
+ addCodec(new GzipCodec());
+ addCodec(new DefaultCodec());
+ } else {
+ Iterator<Class> itr = codecClasses.iterator();
+ while (itr.hasNext()) {
+ CompressionCodec codec =
+ (CompressionCodec) ReflectionUtils.newInstance(itr.next(), conf);
+ addCodec(codec);
+ }
+ }
+ }
+
+ /**
+ * Find the relevant compression codec for the given file based on its
+ * filename suffix.
+ * @param file the filename to check
+ * @return the codec object
+ */
+ public CompressionCodec getCodec(Path file) {
+ CompressionCodec result = null;
+ if (codecs != null) {
+ String filename = file.getName();
+ String reversedFilename = new StringBuffer(filename).reverse().toString();
+ SortedMap<String, CompressionCodec> subMap =
+ codecs.headMap(reversedFilename);
+ if (!subMap.isEmpty()) {
+ String potentialSuffix = subMap.lastKey();
+ if (reversedFilename.startsWith(potentialSuffix)) {
+ result = codecs.get(potentialSuffix);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Removes a suffix from a filename, if it has it.
+ * @param filename the filename to strip
+ * @param suffix the suffix to remove
+ * @return the shortened filename
+ */
+ public static String removeSuffix(String filename, String suffix) {
+ if (filename.endsWith(suffix)) {
+ return filename.substring(0, filename.length() - suffix.length());
+ }
+ return filename;
+ }
+
+ /**
+ * A little test program.
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new org.apache.hadoop.mapred.JobConf();
+ CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+ boolean encode = false;
+ for(int i=0; i < args.length; ++i) {
+ if ("-in".equals(args[i])) {
+ encode = true;
+ } else if ("-out".equals(args[i])) {
+ encode = false;
+ } else {
+ CompressionCodec codec = factory.getCodec(new Path(args[i]));
+ if (codec == null) {
+ System.out.println("Codec for " + args[i] + " not found.");
+ } else {
+ if (encode) {
+ CompressionOutputStream out =
+ codec.createOutputStream(new java.io.FileOutputStream(args[i]));
+ byte[] buffer = new byte[100];
+ String inFilename = removeSuffix(args[i],
+ codec.getDefaultExtension());
+ java.io.InputStream in = new java.io.FileInputStream(inFilename);
+ int len = in.read(buffer);
+ while (len > 0) {
+ out.write(buffer, 0, len);
+ len = in.read(buffer);
+ }
+ in.close();
+ out.close();
+ } else {
+ CompressionInputStream in =
+ codec.createInputStream(new java.io.FileInputStream(args[i]));
+ byte[] buffer = new byte[100];
+ int len = in.read(buffer);
+ while (len > 0) {
+ System.out.write(buffer, 0, len);
+ len = in.read(buffer);
+ }
+ in.close();
+ }
+ }
+ }
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=auto&rev=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Fri Sep 8 14:19:41 2006
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.*;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+/**
+ * This class creates gzip compressors/decompressors.
+ * @author Owen O'Malley
+ */
+public class GzipCodec extends DefaultCodec {
+
+ /**
+ * A bridge that wraps around a DeflaterOutputStream to make it
+ * a CompressionOutputStream.
+ * @author Owen O'Malley
+ */
+ protected static class GzipOutputStream extends DefaultCompressionOutputStream {
+ private static class ResetableGZIPOutputStream extends GZIPOutputStream {
+ public ResetableGZIPOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+
+ public void resetState() throws IOException {
+ def.reset();
+ }
+ }
+
+ public GzipOutputStream(OutputStream out) throws IOException {
+ super(new ResetableGZIPOutputStream(out));
+ }
+
+ /**
+ * Allow children types to put a different type in here.
+ * @param out the Deflater stream to use
+ */
+ protected GzipOutputStream(DefaultCompressionOutputStream out) {
+ super(out);
+ }
+
+
+ public void resetState() throws IOException {
+ ((ResetableGZIPOutputStream) out).resetState();
+ }
+
+ }
+
+ protected static class GzipInputStream extends DefaultCompressionInputStream {
+
+ private static class ResetableGZIPInputStream extends GZIPInputStream {
+ public ResetableGZIPInputStream(InputStream in) throws IOException {
+ super(in);
+ }
+
+ public void resetState() throws IOException {
+ inf.reset();
+ }
+ }
+
+ public GzipInputStream(InputStream in) throws IOException {
+ super(new ResetableGZIPInputStream(in));
+ }
+
+ /**
+ * Allow subclasses to directly set the inflater stream.
+ */
+ protected GzipInputStream(DefaultCompressionInputStream in) {
+ super(in);
+ }
+ }
+
+ /**
+ * Create a stream compressor that will write to the given output stream.
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to
+ */
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return new GzipOutputStream(out);
+ }
+
+ /**
+ * Create a stream decompressor that will read from the given input stream.
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ */
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return new GzipInputStream(in);
+ }
+
+ /**
+ * Get the default filename extension for this kind of compression.
+ * @return the extension including the '.'
+ */
+ public String getDefaultExtension() {
+ return ".gz";
+ }
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Fri Sep 8 14:19:41 2006
@@ -40,6 +40,17 @@
this.minSplitSize = minSplitSize;
}
+ /**
+ * Is the given filename splitable? Usually, true, but if the file is
+ * stream compressed, it will not be.
+ * @param fs the file system that the file is on
+ * @param filename the file name to check
+ * @return is this file splitable?
+ */
+ protected boolean isSplitable(FileSystem fs, Path filename) {
+ return true;
+ }
+
public abstract RecordReader getRecordReader(FileSystem fs,
FileSplit split,
JobConf job,
@@ -117,15 +128,12 @@
Path[] files = listPaths(fs, job);
+ long totalSize = 0; // compute total size
for (int i = 0; i < files.length; i++) { // check we have valid files
Path file = files[i];
if (fs.isDirectory(file) || !fs.exists(file)) {
throw new IOException("Not a file: "+files[i]);
}
- }
-
- long totalSize = 0; // compute total size
- for (int i = 0; i < files.length; i++) {
totalSize += fs.getLength(files[i]);
}
@@ -138,19 +146,24 @@
for (int i = 0; i < files.length; i++) {
Path file = files[i];
long length = fs.getLength(file);
- long blockSize = fs.getBlockSize(file);
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
- bytesRemaining -= splitSize;
- }
-
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+ if (isSplitable(fs, file)) {
+ long blockSize = fs.getBlockSize(file);
+ long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+ splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
+ bytesRemaining -= splitSize;
+ }
+
+ if (bytesRemaining != 0) {
+ splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+ }
+ } else {
+ if (length != 0) {
+ splits.add(new FileSplit(file, 0, length));
+ }
}
- //LOG.info( "Generating splits for " + i + "th file: " + file.getName() );
}
//LOG.info( "Total # of splits: " + splits.size() );
return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Sep 8 14:19:41 2006
@@ -37,6 +37,7 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -332,6 +333,38 @@
*/
public boolean getCompressMapOutput() {
return getBoolean("mapred.compress.map.output", false);
+ }
+
+ /**
+ * Set the given class as the compression codec for the map outputs.
+ * @param codecClass the CompressionCodec class that will compress the
+ * map outputs
+ */
+ public void setMapOutputCompressorClass(Class codecClass) {
+ setCompressMapOutput(true);
+ setClass("mapred.output.compression.codec", codecClass,
+ CompressionCodec.class);
+ }
+
+ /**
+ * Get the codec for compressing the map outputs
+ * @param defaultValue the value to return if it is not set
+ * @return the CompressionCodec class that should be used to compress the
+ * map outputs
+ * @throws IllegalArgumentException if the class was specified, but not found
+ */
+ public Class getMapOutputCompressorClass(Class defaultValue) {
+ String name = get("mapred.output.compression.codec");
+ if (name == null) {
+ return defaultValue;
+ } else {
+ try {
+ return getClassByName(name);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name +
+ " was not found.", e);
+ }
+ }
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Sep 8 14:19:41 2006
@@ -20,8 +20,11 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.commons.logging.*;
@@ -123,21 +126,27 @@
final int partitions = job.getNumReduceTasks();
final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
try {
+ Reporter reporter = getReporter(umbilical, getProgress());
FileSystem localFs = FileSystem.getNamed("local", job);
- /** TODO: Figure out a way to deprecate 'mapred.compress.map.output' */
- boolean compressTemps = job.getBoolean("mapred.compress.map.output",
- false);
+ CompressionCodec codec = null;
+ CompressionType compressionType = CompressionType.NONE;
+ if (job.getCompressMapOutput()) {
+ // find the kind of compression to do, defaulting to record
+ compressionType = SequenceFile.getCompressionType(job);
+
+ // find the right codec
+ Class codecClass =
+ job.getMapOutputCompressorClass(DefaultCodec.class);
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(codecClass, job);
+ }
for (int i = 0; i < partitions; i++) {
+ Path filename = mapOutputFile.getOutputFile(getTaskId(), i);
outs[i] =
- SequenceFile.createWriter(localFs, job,
- this.mapOutputFile.getOutputFile(getTaskId(), i),
- job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(),
- compressTemps ? CompressionType.RECORD :
- CompressionType.valueOf(
- job.get("mapred.seqfile.compression.type",
- "NONE"))
- );
+ SequenceFile.createWriter(localFs, job, filename,
+ job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(),
+ compressionType, codec, reporter);
LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
}
@@ -157,7 +166,6 @@
};
OutputCollector collector = partCollector;
- Reporter reporter = getReporter(umbilical, getProgress());
boolean combining = job.getCombinerClass() != null;
if (combining) { // add combining collector
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Fri Sep 8 14:19:41 2006
@@ -20,10 +20,63 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
/** A base class for {@link OutputFormat}. */
public abstract class OutputFormatBase implements OutputFormat {
+
+ /**
+ * Set whether the output of the reduce is compressed
+ * @param val the new setting
+ */
+ public static void setCompressOutput(JobConf conf, boolean val) {
+ conf.setBoolean("mapred.output.compress", val);
+ }
+
+ /**
+ * Is the reduce output compressed?
+ * @return true, if the output should be compressed
+ */
+ public static boolean getCompressOutput(JobConf conf) {
+ return conf.getBoolean("mapred.output.compress", false);
+ }
+
+ /**
+ * Set the given class as the output compression codec.
+ * @param conf the JobConf to modify
+ * @param codecClass the CompressionCodec class that will compress the
+ * reduce outputs
+ */
+ public static void setOutputCompressorClass(JobConf conf, Class codecClass) {
+ setCompressOutput(conf, true);
+ conf.setClass("mapred.output.compression.codec", codecClass,
+ CompressionCodec.class);
+ }
+
+ /**
+ * Get the codec for compressing the reduce outputs
+ * @param conf the Configuration to look in
+ * @param defaultValue the value to return if it is not set
+ * @return the CompressionCodec class that should be used to compress the
+ * reduce outputs
+ * @throws IllegalArgumentException if the class was specified, but not found
+ */
+ public static Class getOutputCompressorClass(JobConf conf,
+ Class defaultValue) {
+ String name = conf.get("mapred.output.compression.codec");
+ if (name == null) {
+ return defaultValue;
+ } else {
+ try {
+ return conf.getClassByName(name);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name +
+ " was not found.", e);
+ }
+ }
+ }
+
public abstract RecordWriter getRecordWriter(FileSystem fs,
JobConf job, String name,
Progressable progress)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Sep 8 14:19:41 2006
@@ -27,8 +27,10 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
public class SequenceFileOutputFormat extends OutputFormatBase {
@@ -38,18 +40,23 @@
throws IOException {
Path file = new Path(job.getOutputPath(), name);
+ CompressionCodec codec = null;
+ CompressionType compressionType = CompressionType.NONE;
+ if (getCompressOutput(job)) {
+ // find the kind of compression to do
+ compressionType = SequenceFile.getCompressionType(job);
- /** TODO: Figure out a way to deprecate 'mapred.output.compress' */
+ // find the right codec
+ Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(codecClass, job);
+ }
final SequenceFile.Writer out =
SequenceFile.createWriter(fs, job, file,
job.getOutputKeyClass(),
job.getOutputValueClass(),
- job.getBoolean("mapred.output.compress", false) ?
- CompressionType.RECORD :
- CompressionType.valueOf(
- job.get("mapred.seqfile.compression.type",
- "NONE")
- ),
+ compressionType,
+ codec,
progress);
return new RecordWriter() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Fri Sep 8 14:19:41 2006
@@ -16,106 +16,148 @@
package org.apache.hadoop.mapred;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
+import java.io.*;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
/** An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text.. */
-public class TextInputFormat extends InputFormatBase {
+public class TextInputFormat extends InputFormatBase implements JobConfigurable {
+
+ private CompressionCodecFactory compressionCodecs = null;
+
+ public void configure(JobConf conf) {
+ compressionCodecs = new CompressionCodecFactory(conf);
+ }
+
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ return compressionCodecs.getCodec(file) == null;
+ }
+
+ protected static class LineRecordReader implements RecordReader {
+ private long pos;
+ private long end;
+ private BufferedInputStream in;
+ private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+ /**
+ * Provide a bridge to get the bytes from the ByteArrayOutputStream
+ * without creating a new byte array.
+ */
+ private static class TextStuffer extends OutputStream {
+ public Text target;
+ public void write(int b) {
+ throw new UnsupportedOperationException("write(byte) not supported");
+ }
+ public void write(byte[] data, int offset, int len) throws IOException {
+ target.set(data, offset, len);
+ }
+ }
+ private TextStuffer bridge = new TextStuffer();
+
+ public LineRecordReader(InputStream in, long offset, long endOffset) {
+ this.in = new BufferedInputStream(in);
+ this.pos = offset;
+ this.end = endOffset;
+ }
+
+ public WritableComparable createKey() {
+ return new LongWritable();
+ }
+
+ public Writable createValue() {
+ return new Text();
+ }
+
+ /** Read a line. */
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ if (pos >= end)
+ return false;
+
+ ((LongWritable)key).set(pos); // key is position
+ buffer.reset();
+ long bytesRead = readLine(in, buffer);
+ if (bytesRead == 0) {
+ return false;
+ }
+ pos += bytesRead;
+ bridge.target = (Text) value;
+ buffer.writeTo(bridge);
+ return true;
+ }
+
+ public synchronized long getPos() throws IOException {
+ return pos;
+ }
+
+ public synchronized void close() throws IOException {
+ in.close();
+ }
+ }
+
public RecordReader getRecordReader(FileSystem fs, FileSplit split,
JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
- final long start = split.getStart();
- final long end = start + split.getLength();
+ long start = split.getStart();
+ long end = start + split.getLength();
+ final Path file = split.getPath();
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
- final FSDataInputStream in = fs.open(split.getPath());
+ FSDataInputStream fileIn = fs.open(split.getPath());
+ InputStream in = fileIn;
- if (start != 0) {
- in.seek(start-1);
- while (in.getPos() < end) { // scan to the next newline in the file
- char c = (char)in.read();
- if (c == '\n')
- break;
-
- if (c == '\r') {
- long curPos = in.getPos();
- char nextC = (char)in.read();
- if (nextC != '\n') {
- in.seek(curPos);
- }
-
- break;
- }
- }
+ if (codec != null) {
+ in = codec.createInputStream(fileIn);
+ end = Long.MAX_VALUE;
+ } else if (start != 0) {
+ fileIn.seek(start-1);
+ readLine(fileIn, null);
+ start = fileIn.getPos();
}
-
- return new RecordReader() {
-
- public WritableComparable createKey() {
- return new LongWritable();
- }
-
- public Writable createValue() {
- return new Text();
- }
-
- /** Read a line. */
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- long pos = in.getPos();
- if (pos >= end)
- return false;
-
- ((LongWritable)key).set(pos); // key is position
- ((Text)value).set(readLine(in)); // value is line
- return true;
- }
-
- public synchronized long getPos() throws IOException {
- return in.getPos();
- }
-
- public synchronized void close() throws IOException { in.close(); }
-
- };
+
+ return new LineRecordReader(in, start, end);
}
- private static String readLine(FSDataInputStream in) throws IOException {
- StringBuffer buffer = new StringBuffer();
+ public static long readLine(InputStream in,
+ OutputStream out) throws IOException {
+ long bytes = 0;
while (true) {
int b = in.read();
- if (b == -1)
+ if (b == -1) {
break;
-
- char c = (char)b; // bug: this assumes eight-bit characters.
- if (c == '\n')
+ }
+ bytes += 1;
+
+ byte c = (byte)b;
+ if (c == '\n') {
break;
-
- if (c == '\r') {
- long curPos = in.getPos();
- char nextC = (char)in.read();
+ }
+
+ if (c == '\r') {
+ in.mark(1);
+ byte nextC = (byte)in.read();
if (nextC != '\n') {
- in.seek(curPos);
+ in.reset();
+ } else {
+ bytes += 1;
}
-
break;
}
- buffer.append(c);
+ if (out != null) {
+ out.write(c);
+ }
}
-
- return buffer.toString();
+ return bytes;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Fri Sep 8 14:19:41 2006
@@ -16,6 +16,7 @@
package org.apache.hadoop.mapred;
+import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
@@ -24,30 +25,51 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes plain text files. */
public class TextOutputFormat extends OutputFormatBase {
+ protected static class LineRecordWriter implements RecordWriter {
+ private DataOutputStream out;
+
+ public LineRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+
+ public synchronized void write(WritableComparable key, Writable value)
+ throws IOException {
+ out.write(key.toString().getBytes("UTF-8"));
+ out.writeByte('\t');
+ out.write(value.toString().getBytes("UTF-8"));
+ out.writeByte('\n');
+ }
+ public synchronized void close(Reporter reporter) throws IOException {
+ out.close();
+ }
+ }
+
public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
String name, Progressable progress) throws IOException {
- Path file = new Path(job.getOutputPath(), name);
-
- final FSDataOutputStream out = fs.create(file, progress);
-
- return new RecordWriter() {
- public synchronized void write(WritableComparable key, Writable value)
- throws IOException {
- out.write(key.toString().getBytes("UTF-8"));
- out.writeByte('\t');
- out.write(value.toString().getBytes("UTF-8"));
- out.writeByte('\n');
- }
- public synchronized void close(Reporter reporter) throws IOException {
- out.close();
- }
- };
+ Path dir = job.getOutputPath();
+ boolean isCompressed = getCompressOutput(job);
+ if (!isCompressed) {
+ FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+ return new LineRecordWriter(fileOut);
+ } else {
+ Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
+ // create the named codec
+ CompressionCodec codec = (CompressionCodec)
+ ReflectionUtils.newInstance(codecClass, job);
+ // build the filename including the extension
+ Path filename = new Path(dir, name + codec.getDefaultExtension());
+ FSDataOutputStream fileOut = fs.create(filename, progress);
+ return new LineRecordWriter(new DataOutputStream
+ (codec.createOutputStream(fileOut)));
+ }
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java Fri Sep 8 14:19:41 2006
@@ -198,7 +198,7 @@
Text text = new Text("abcd\u20acbdcd\u20ac");
byte [] utf8 = text.getBytes();
int length = text.getLength();
- Text.validateUTF(utf8, 0, length);
+ Text.validateUTF8(utf8, 0, length);
}
public void testTextText() throws CharacterCodingException {
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?view=auto&rev=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Fri Sep 8 14:19:41 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.*;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestCodecFactory extends TestCase {
+
+ private static class BaseCodec implements CompressionCodec {
+ public CompressionOutputStream createOutputStream(OutputStream out) {
+ return null;
+ }
+
+ public CompressionInputStream createInputStream(InputStream in) {
+ return null;
+ }
+
+ public String getDefaultExtension() {
+ return ".base";
+ }
+ }
+
+ private static class BarCodec extends BaseCodec {
+ public String getDefaultExtension() {
+ return "bar";
+ }
+ }
+
+ private static class FooBarCodec extends BaseCodec {
+ public String getDefaultExtension() {
+ return ".foo.bar";
+ }
+ }
+
+ private static class FooCodec extends BaseCodec {
+ public String getDefaultExtension() {
+ return ".foo";
+ }
+ }
+
+ /**
+ * Returns a factory for a given set of codecs
+ * @param classes the codec classes to include
+ * @return a new factory
+ */
+ private static CompressionCodecFactory setClasses(Class[] classes) {
+ Configuration conf = new Configuration();
+ CompressionCodecFactory.setCodecClasses(conf, Arrays.asList(classes));
+ return new CompressionCodecFactory(conf);
+ }
+
+ private static void checkCodec(String msg,
+ Class expected, CompressionCodec actual) {
+ assertEquals(msg + " unexpected codec found",
+ expected.getName(),
+ actual.getClass().getName());
+ }
+
+ public static void testFinding() {
+ CompressionCodecFactory factory =
+ new CompressionCodecFactory(new Configuration());
+ CompressionCodec codec = factory.getCodec(new Path("/tmp/foo.bar"));
+ assertEquals("default factory foo codec", null, codec);
+ codec = factory.getCodec(new Path("/tmp/foo.gz"));
+ checkCodec("default factory for .gz", GzipCodec.class, codec);
+ factory = setClasses(new Class[0]);
+ codec = factory.getCodec(new Path("/tmp/foo.bar"));
+ assertEquals("empty codec bar codec", null, codec);
+ codec = factory.getCodec(new Path("/tmp/foo.gz"));
+ assertEquals("empty codec gz codec", null, codec);
+ factory = setClasses(new Class[]{BarCodec.class, FooCodec.class,
+ FooBarCodec.class});
+ codec = factory.getCodec(new Path("/tmp/.foo.bar.gz"));
+ assertEquals("full factory gz codec", null, codec);
+ codec = factory.getCodec(new Path("/tmp/foo.bar"));
+ checkCodec("full factory bar codec", BarCodec.class, codec);
+ codec = factory.getCodec(new Path("/tmp/foo/baz.foo.bar"));
+ checkCodec("full factory foo bar codec", FooBarCodec.class, codec);
+ codec = factory.getCodec(new Path("/tmp/foo.foo"));
+ checkCodec("full factory foo codec", FooCodec.class, codec);
+ }
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Sep 8 14:19:41 2006
@@ -17,7 +17,6 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.lib.*;
import junit.framework.TestCase;
import java.io.*;
@@ -83,7 +82,6 @@
* as many times as we were instructed.
*/
static class RandomGenMapper implements Mapper {
- Random r = new Random();
public void configure(JobConf job) {
}
@@ -105,7 +103,6 @@
}
public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
- int keyint = ((IntWritable) key).get();
while (it.hasNext()) {
int val = ((IntWritable) it.next()).get();
out.collect(new Text("" + val), new Text(""));
@@ -136,7 +133,6 @@
}
public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
- long pos = ((LongWritable) key).get();
Text str = (Text) val;
out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
@@ -203,7 +199,6 @@
private static int range = 10;
private static int counts = 100;
private static Random r = new Random();
- private static Configuration conf = new Configuration();
/**
public TestMapRed(int range, int counts, Configuration conf) throws IOException {
@@ -252,19 +247,14 @@
private static class MyReduce extends IdentityReducer {
private JobConf conf;
private boolean compressInput;
- private boolean compressOutput;
private String taskId;
- private int partition;
private boolean first = true;
public void configure(JobConf conf) {
this.conf = conf;
compressInput = conf.getBoolean("mapred.compress.map.output",
false);
- compressOutput = conf.getBoolean("mapred.compress.output",
- false);
taskId = conf.get("mapred.task.id");
- partition = conf.getInt("mapred.task.partition", -1);
}
public void reduce(WritableComparable key, Iterator values,
@@ -295,6 +285,7 @@
Path inDir = new Path(testdir, "in");
Path outDir = new Path(testdir, "out");
FileSystem fs = FileSystem.get(conf);
+ fs.delete(testdir);
conf.setInputPath(inDir);
conf.setOutputPath(outDir);
conf.setMapperClass(MyMap.class);
@@ -306,10 +297,10 @@
conf.setCombinerClass(IdentityReducer.class);
}
if (compressMapOutput) {
- conf.setBoolean("mapred.compress.map.output", true);
+ conf.setCompressMapOutput(true);
}
if (compressReduceOutput) {
- conf.setBoolean("mapred.output.compress", true);
+ SequenceFileOutputFormat.setCompressOutput(conf, true);
}
try {
fs.mkdirs(testdir);
@@ -354,6 +345,7 @@
//
// Generate distribution of ints. This is the answer key.
//
+ JobConf conf = new JobConf();
int countsToGo = counts;
int dist[] = new int[range];
for (int i = 0; i < range; i++) {
@@ -376,7 +368,10 @@
fs.mkdirs(randomIns);
Path answerkey = new Path(randomIns, "answer.key");
- SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey, IntWritable.class, IntWritable.class);
+ SequenceFile.Writer out =
+ SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
+ IntWritable.class,
+ SequenceFile.CompressionType.NONE);
try {
for (int i = 0; i < range; i++) {
out.append(new IntWritable(i), new IntWritable(dist[i]));
@@ -409,8 +404,6 @@
JobConf genJob = new JobConf(conf);
genJob.setInputPath(randomIns);
- genJob.setInputKeyClass(IntWritable.class);
- genJob.setInputValueClass(IntWritable.class);
genJob.setInputFormat(SequenceFileInputFormat.class);
genJob.setMapperClass(RandomGenMapper.class);
@@ -479,8 +472,6 @@
fs.delete(finalOuts);
JobConf mergeJob = new JobConf(conf);
mergeJob.setInputPath(intermediateOuts);
- mergeJob.setInputKeyClass(IntWritable.class);
- mergeJob.setInputValueClass(IntWritable.class);
mergeJob.setInputFormat(SequenceFileInputFormat.class);
mergeJob.setMapperClass(MergeMapper.class);
@@ -564,8 +555,8 @@
}
int i = 0;
- int range = Integer.parseInt(argv[i++]);
- int counts = Integer.parseInt(argv[i++]);
- launch();
+ range = Integer.parseInt(argv[i++]);
+ counts = Integer.parseInt(argv[i++]);
+ launch();
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=441653&r1=441652&r2=441653
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Sep 8 14:19:41 2006
@@ -20,20 +20,33 @@
import java.util.*;
import junit.framework.TestCase;
+import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.compress.*;
public class TestTextInputFormat extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestTextInputFormat.class.getName());
private static int MAX_LENGTH = 10000;
- private static Configuration conf = new Configuration();
+
+ private static JobConf defaultConf = new JobConf();
+ private static FileSystem localFs = null;
+ static {
+ try {
+ localFs = FileSystem.getNamed("local", defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestTextInputFormat");
public void testFormat() throws Exception {
- JobConf job = new JobConf(conf);
- FileSystem fs = FileSystem.getNamed("local", conf);
- Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
- Path file = new Path(dir, "test.txt");
+ JobConf job = new JobConf();
+ Path file = new Path(workDir, "test.txt");
Reporter reporter = new Reporter() {
public void setStatus(String status) throws IOException {}
@@ -41,20 +54,20 @@
};
int seed = new Random().nextInt();
- //LOG.info("seed = "+seed);
+ LOG.info("seed = "+seed);
Random random = new Random(seed);
- fs.delete(dir);
- job.setInputPath(dir);
+ localFs.delete(workDir);
+ job.setInputPath(workDir);
// for a variety of lengths
for (int length = 0; length < MAX_LENGTH;
length+= random.nextInt(MAX_LENGTH/10)+1) {
- //LOG.info("creating; entries = " + length);
+ LOG.debug("creating; entries = " + length);
// create a file with length entries
- Writer writer = new OutputStreamWriter(fs.create(file));
+ Writer writer = new OutputStreamWriter(localFs.create(file));
try {
for (int i = 0; i < length; i++) {
writer.write(Integer.toString(i));
@@ -65,33 +78,38 @@
}
// try splitting the file in a variety of sizes
- InputFormat format = new TextInputFormat();
+ TextInputFormat format = new TextInputFormat();
+ format.configure(job);
LongWritable key = new LongWritable();
Text value = new Text();
for (int i = 0; i < 3; i++) {
int numSplits = random.nextInt(MAX_LENGTH/20)+1;
- //LOG.info("splitting: requesting = " + numSplits);
- FileSplit[] splits = format.getSplits(fs, job, numSplits);
- //LOG.info("splitting: got = " + splits.length);
+ LOG.debug("splitting: requesting = " + numSplits);
+ FileSplit[] splits = format.getSplits(localFs, job, numSplits);
+ LOG.debug("splitting: got = " + splits.length);
// check each split
BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) {
+ LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" +
+ splits[j].getLength());
RecordReader reader =
- format.getRecordReader(fs, splits[j], job, reporter);
+ format.getRecordReader(localFs, splits[j], job, reporter);
try {
int count = 0;
while (reader.next(key, value)) {
int v = Integer.parseInt(value.toString());
- // if (bits.get(v)) {
- // LOG.info("splits["+j+"]="+splits[j]+" : " + v);
- // LOG.info("@"+reader.getPos());
- // }
+ LOG.debug("read " + v);
+ if (bits.get(v)) {
+ LOG.warn("conflict with " + v +
+ " in split " + j +
+ " at position "+reader.getPos());
+ }
assertFalse("Key in multiple partitions.", bits.get(v));
bits.set(v);
count++;
}
- //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+ LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
} finally {
reader.close();
}
@@ -102,6 +120,110 @@
}
}
+ private InputStream makeStream(String str) throws IOException {
+ Text text = new Text(str);
+ return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());
+ }
+
+ public void testUTF8() throws Exception {
+ InputStream in = makeStream("abcd\u20acbdcd\u20ac");
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TextInputFormat.readLine(in, out);
+ Text line = new Text();
+ line.set(out.toByteArray());
+ assertEquals("readLine changed utf8 characters",
+ "abcd\u20acbdcd\u20ac", line.toString());
+ in = makeStream("abc\u200axyz");
+ out.reset();
+ TextInputFormat.readLine(in, out);
+ line.set(out.toByteArray());
+ assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+ }
+
+ public void testNewLines() throws Exception {
+ InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ TextInputFormat.readLine(in, out);
+ assertEquals("line1 length", 1, out.size());
+ out.reset();
+ TextInputFormat.readLine(in, out);
+ assertEquals("line2 length", 2, out.size());
+ out.reset();
+ TextInputFormat.readLine(in, out);
+ assertEquals("line3 length", 0, out.size());
+ out.reset();
+ TextInputFormat.readLine(in, out);
+ assertEquals("line4 length", 3, out.size());
+ out.reset();
+ TextInputFormat.readLine(in, out);
+ assertEquals("line5 length", 4, out.size());
+ out.reset();
+ TextInputFormat.readLine(in, out);
+ assertEquals("line5 length", 5, out.size());
+ assertEquals("end of file", 0, TextInputFormat.readLine(in, out));
+ }
+
+ private static void writeFile(FileSystem fs, Path name,
+ CompressionCodec codec,
+ String contents) throws IOException {
+ OutputStream stm;
+ if (codec == null) {
+ stm = fs.create(name);
+ } else {
+ stm = codec.createOutputStream(fs.create(name));
+ }
+ stm.write(contents.getBytes());
+ stm.close();
+ }
+
+ private static class VoidReporter implements Reporter {
+ public void progress() {}
+ public void setStatus(String msg) {}
+ }
+ private static final Reporter voidReporter = new VoidReporter();
+
+ private static List<Text> readSplit(InputFormat format,
+ FileSplit split,
+ JobConf job) throws IOException {
+ List<Text> result = new ArrayList<Text>();
+ RecordReader reader = format.getRecordReader(localFs, split, job,
+ voidReporter);
+ LongWritable key = (LongWritable) reader.createKey();
+ Text value = (Text) reader.createValue();
+ while (reader.next(key, value)) {
+ result.add(value);
+ value = (Text) reader.createValue();
+ }
+ return result;
+ }
+
+ /**
+ * Test using the gzip codec for reading
+ */
+ public static void testGzip() throws IOException {
+ CompressionCodec gzip = new GzipCodec();
+ localFs.delete(workDir);
+ writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+ "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
+ writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+ "this is a test\nof gzip\n");
+ JobConf job = new JobConf();
+ job.setInputPath(workDir);
+ TextInputFormat format = new TextInputFormat();
+ format.configure(job);
+ FileSplit[] splits = format.getSplits(localFs, job, 100);
+ assertEquals("compressed splits == 2", 2, splits.length);
+ List<Text> results = readSplit(format, splits[0], job);
+ assertEquals("splits[0] length", 6, results.size());
+ assertEquals("splits[0][5]", " dog", results.get(5).toString());
+ results = readSplit(format, splits[1], job);
+ assertEquals("splits[1] length", 2, results.size());
+ assertEquals("splits[1][0]", "this is a test",
+ results.get(0).toString());
+ assertEquals("splits[1][1]", "of gzip",
+ results.get(1).toString());
+ }
+
public static void main(String[] args) throws Exception {
new TestTextInputFormat().testFormat();
}