You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/12 03:12:24 UTC
svn commit: r984625 - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/mapred/
lang/java/src/test/java/org/apache/avro/mapred/
Author: cutting
Date: Thu Aug 12 01:12:23 2010
New Revision: 984625
URL: http://svn.apache.org/viewvc?rev=984625&view=rev
Log:
AVRO-586. Java: Permit specification of MapReduce output file metadata properties. Contributed by Ken Krugler.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Aug 12 01:12:23 2010
@@ -106,6 +106,9 @@ Avro 1.4.0 (unreleased)
AVRO-557. Java: Cache ResolvingDecoder instances, speeding
DatumReader benchmarks by 5x to 9x. (cutting & scotcarey)
+ AVRO-586. Java: Permit specification of MapReduce output file
+ metadata properties. (Ken Krugler via cutting)
+
BUG FIXES
AVRO-502. Memory leak from parsing JSON schema.
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Thu Aug 12 01:12:23 2010
@@ -19,6 +19,8 @@
package org.apache.avro.mapred;
import java.util.Collection;
+import java.net.URLEncoder;
+import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
@@ -38,6 +40,10 @@ public class AvroJob {
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
/** The configuration key for a job's output schema. */
public static final String OUTPUT_SCHEMA = "avro.output.schema";
+ /** The configuration key prefix for a text output metadata. */
+ public static final String TEXT_PREFIX = "avro.meta.text.";
+ /** The configuration key prefix for a binary output metadata. */
+ public static final String BINARY_PREFIX = "avro.meta.binary.";
/** Configure a job's map input schema. */
public static void setInputSchema(JobConf job, Schema s) {
@@ -70,6 +76,25 @@ public class AvroJob {
configureAvroJob(job);
}
+ /** Add metadata to job output files.*/
+ public static void setOutputMeta(JobConf job, String key, String value) {
+ job.set(TEXT_PREFIX+key, value);
+ }
+ /** Add metadata to job output files.*/
+ public static void setOutputMeta(JobConf job, String key, long value) {
+ job.set(TEXT_PREFIX+key, Long.toString(value));
+ }
+ /** Add metadata to job output files.*/
+ public static void setOutputMeta(JobConf job, String key, byte[] value) {
+ try {
+ job.set(BINARY_PREFIX+key,
+ URLEncoder.encode(new String(value, "ISO-8859-1"),
+ "ISO-8859-1"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
return Schema.parse(job.get(OUTPUT_SCHEMA));
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java Thu Aug 12 01:12:23 2010
@@ -19,6 +19,8 @@
package org.apache.avro.mapred;
import java.io.IOException;
+import java.util.Map;
+import java.net.URLDecoder;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +73,17 @@ public class AvroOutputFormat <T>
writer.setCodec(CodecFactory.deflateCodec(level));
}
+ // copy metadata from job
+ for (Map.Entry<String,String> e : job) {
+ if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
+ writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
+ e.getValue());
+ if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
+ writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
+ URLDecoder.decode(e.getValue(), "ISO-8859-1")
+ .getBytes("ISO-8859-1"));
+ }
+
Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
writer.create(schema, path.getFileSystem(job).create(path));
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java Thu Aug 12 01:12:23 2010
@@ -81,6 +81,8 @@ public class TestWordCount {
FileOutputFormat.setOutputPath(job, outputPath);
FileOutputFormat.setCompressOutput(job, true);
+ WordCountUtil.setMeta(job);
+
JobClient.runJob(job);
WordCountUtil.validateCountsFile();
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java Thu Aug 12 01:12:23 2010
@@ -26,11 +26,13 @@ import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import java.io.PrintStream;
+import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
@@ -92,7 +94,7 @@ class WordCountUtil {
out.close();
}
- public static void validateCountsFile() throws IOException {
+ public static void validateCountsFile() throws Exception {
DatumReader<Pair<Utf8,Long>> reader
= new SpecificDatumReader<Pair<Utf8,Long>>();
InputStream in = new BufferedInputStream(new FileInputStream(COUNTS_FILE));
@@ -104,8 +106,31 @@ class WordCountUtil {
COUNTS.get(wc.key().toString()), wc.value());
numWords++;
}
+ checkMeta(counts);
in.close();
assertEquals(COUNTS.size(), numWords);
}
+ // metadata tests
+ private static final String STRING_KEY = "string-key";
+ private static final String LONG_KEY = "long-key";
+ private static final String BYTES_KEY = "bytes-key";
+
+ private static final String STRING_META_VALUE = "value";
+ private static final long LONG_META_VALUE = 666;
+ private static final byte[] BYTES_META_VALUE
+ = new byte[] {(byte)0x00, (byte)0x80, (byte)0xff};
+
+ public static void setMeta(JobConf job) {
+ AvroJob.setOutputMeta(job, STRING_KEY, STRING_META_VALUE);
+ AvroJob.setOutputMeta(job, LONG_KEY, LONG_META_VALUE);
+ AvroJob.setOutputMeta(job, BYTES_KEY, BYTES_META_VALUE);
+ }
+
+ public static void checkMeta(DataFileStream<?> in) throws Exception {
+ assertEquals(STRING_META_VALUE, in.getMetaString(STRING_KEY));
+ assertEquals(LONG_META_VALUE, in.getMetaLong(LONG_KEY));
+ assertTrue(Arrays.equals(BYTES_META_VALUE, in.getMeta(BYTES_KEY)));
+ }
+
}