You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Farhan Husain <fa...@csebuet.org> on 2010/01/05 01:13:22 UTC

Re: Multiple file output

Hello,

You can extend FileOutputFormat class. Here is an example:

public class MultipleTextOutputFormatByPredicates<K, V> extends
> 		FileOutputFormat<K, V> {
> 	protected static class MultipleOutputByPredicatesLineRecordWriter<K, V> extends RecordWriter<K, V> {
> 		private static final String utf8 = "UTF-8";
> 		private static final byte[] newline;
> 		static {
> 			try {
> 				newline = "\n".getBytes(utf8);
> 			} catch (UnsupportedEncodingException uee) {
> 				throw new IllegalArgumentException("can't find " + utf8
> 						+ " encoding");
> 			}
> 		}
>
> 		protected TaskAttemptContext job;
> 		protected CompressionCodec codec;
> 		protected String extension = "";
> 		protected Map<String, DataOutputStream> outMap;
> 		private final byte[] keyValueSeparator;
>
> 		public MultipleOutputByPredicatesLineRecordWriter(CompressionCodec codec,
> 				String keyValueSeparator,
> 				TaskAttemptContext job) {
> 			this.job = job;
> 			this.codec = codec;
> 			if (null != codec)
> 				this.extension = codec.getDefaultExtension();
> 			try {
> 				this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
> 			} catch (UnsupportedEncodingException uee) {
> 				throw new IllegalArgumentException("can't find " + utf8
> 						+ " encoding");
> 			}
> 			outMap = new HashMap<String, DataOutputStream>();
> 		}
>
> 		public MultipleOutputByPredicatesLineRecordWriter(CompressionCodec codec, TaskAttemptContext job) {
> 			this(codec, "\t", job);
> 		}
>
> 		/**
> 		 * Write the object to the byte stream, handling Text as a special case.
> 		 *
> 		 * @param o
> 		 *            the object to print
> 		 * @throws IOException
> 		 *             if the write throws, we pass it on
> 		 */
> 		private void writeObject(Object o, DataOutputStream out) throws IOException {
> 			if (o instanceof Text) {
> 				Text to = (Text) o;
> 				out.write(to.getBytes(), 0, to.getLength());
> 			} else {
> 				out.write(o.toString().getBytes(utf8));
> 			}
> 		}
>
> 		public synchronized void write(K key, V value) throws IOException {
>
> 			boolean nullKey = key == null || key instanceof NullWritable;
> 			boolean nullValue = value == null || value instanceof NullWritable;
> 			if (nullKey || nullValue) {
> 				return;
> 			}
> 			String sPredicate = key.toString().replace(':', '_');
> 			DataOutputStream out = outMap.get(sPredicate);
> 			if (null == out) {
> 				Path file = new Path(job.getConfiguration().get("mapred.output.dir"), sPredicate);
> 				FileSystem fs = file.getFileSystem(job.getConfiguration());
> 				FSDataOutputStream fileOut = fs.create(file, false);
> 				outMap.put(sPredicate, fileOut);
> 				out = fileOut;
> 			}
> 			out.write(keyValueSeparator);
> 			writeObject(value, out);
> 			out.write(newline);
> 		}
>
> 		public synchronized void close(TaskAttemptContext context)
> 				throws IOException {
> 			Iterator<DataOutputStream> iter = outMap.values().iterator();
> 			while (iter.hasNext())
> 				iter.next().close();
> 		}
> 	}
>
> 	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
> 			throws IOException, InterruptedException {
> 		Configuration conf = job.getConfiguration();
> 		boolean isCompressed = getCompressOutput(job);
> 		String keyValueSeparator = conf.get(
> 				"mapred.textoutputformat.separator", "\t");
> 		CompressionCodec codec = null;
> 		if (isCompressed) {
> 			Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
> 					job, GzipCodec.class);
> 			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
> 					conf);
> 		}
> 		return new MultipleOutputByPredicatesLineRecordWriter<K, V>(codec, keyValueSeparator, job);
> 	}
> }
>
> Thanks,
Farhan

On Mon, Dec 28, 2009 at 12:27 PM, Huazhong Ning <ni...@akiira.com> wrote:

> Hi all,
>
> I need your help on multiple file output. I have many big files and I hope
> the processing result of each file is outputted to a separate file. I know
> in the old Hadoop APIs, the class MultipleOutputFormat works for this
> propose. But I cannot find the same class in new APIs. Does anybody know in
> the new APIs how to solve this problem?
> Thanks a lot.
>
> Ning, Huazhong
>
>
>