You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Farhan Husain <fa...@csebuet.org> on 2010/01/05 01:13:22 UTC
Re: Multiple file output
Hello,
You can extend FileOutputFormat class. Here is an example:
public class MultipleTextOutputFormatByPredicates<K, V> extends
> FileOutputFormat<K, V> {
> protected static class MultipleOutputByPredicatesLineRecordWriter<K, V> extends RecordWriter<K, V> {
> private static final String utf8 = "UTF-8";
> private static final byte[] newline;
> static {
> try {
> newline = "\n".getBytes(utf8);
> } catch (UnsupportedEncodingException uee) {
> throw new IllegalArgumentException("can't find " + utf8
> + " encoding");
> }
> }
>
> protected TaskAttemptContext job;
> protected CompressionCodec codec;
> protected String extension = "";
> protected Map<String, DataOutputStream> outMap;
> private final byte[] keyValueSeparator;
>
> public MultipleOutputByPredicatesLineRecordWriter(CompressionCodec codec,
> String keyValueSeparator,
> TaskAttemptContext job) {
> this.job = job;
> this.codec = codec;
> if (null != codec)
> this.extension = codec.getDefaultExtension();
> try {
> this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
> } catch (UnsupportedEncodingException uee) {
> throw new IllegalArgumentException("can't find " + utf8
> + " encoding");
> }
> outMap = new HashMap<String, DataOutputStream>();
> }
>
> public MultipleOutputByPredicatesLineRecordWriter(CompressionCodec codec, TaskAttemptContext job) {
> this(codec, "\t", job);
> }
>
> /**
> * Write the object to the byte stream, handling Text as a special case.
> *
> * @param o
> * the object to print
> * @throws IOException
> * if the write throws, we pass it on
> */
> private void writeObject(Object o, DataOutputStream out) throws IOException {
> if (o instanceof Text) {
> Text to = (Text) o;
> out.write(to.getBytes(), 0, to.getLength());
> } else {
> out.write(o.toString().getBytes(utf8));
> }
> }
>
> public synchronized void write(K key, V value) throws IOException {
>
> boolean nullKey = key == null || key instanceof NullWritable;
> boolean nullValue = value == null || value instanceof NullWritable;
> if (nullKey || nullValue) {
> return;
> }
> String sPredicate = key.toString().replace(':', '_');
> DataOutputStream out = outMap.get(sPredicate);
> if (null == out) {
> Path file = new Path(job.getConfiguration().get("mapred.output.dir"), sPredicate);
> FileSystem fs = file.getFileSystem(job.getConfiguration());
> FSDataOutputStream fileOut = fs.create(file, false);
> outMap.put(sPredicate, fileOut);
> out = fileOut;
> }
> out.write(keyValueSeparator);
> writeObject(value, out);
> out.write(newline);
> }
>
> public synchronized void close(TaskAttemptContext context)
> throws IOException {
> Iterator<DataOutputStream> iter = outMap.values().iterator();
> while (iter.hasNext())
> iter.next().close();
> }
> }
>
> public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
> throws IOException, InterruptedException {
> Configuration conf = job.getConfiguration();
> boolean isCompressed = getCompressOutput(job);
> String keyValueSeparator = conf.get(
> "mapred.textoutputformat.separator", "\t");
> CompressionCodec codec = null;
> if (isCompressed) {
> Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
> job, GzipCodec.class);
> codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
> conf);
> }
> return new MultipleOutputByPredicatesLineRecordWriter<K, V>(codec, keyValueSeparator, job);
> }
> }
>
> Thanks,
Farhan
On Mon, Dec 28, 2009 at 12:27 PM, Huazhong Ning <ni...@akiira.com> wrote:
> Hi all,
>
> I need your help on multiple file output. I have many big files and I hope
> the processing result of each file is outputted to a separate file. I know
> in the old Hadoop APIs, the class MultipleOutputFormat works for this
> propose. But I cannot find the same class in new APIs. Does anybody know in
> the new APIs how to solve this problem?
> Thanks a lot.
>
> Ning, Huazhong
>
>
>