You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/08/12 03:12:24 UTC

svn commit: r984625 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/mapred/ lang/java/src/test/java/org/apache/avro/mapred/

Author: cutting
Date: Thu Aug 12 01:12:23 2010
New Revision: 984625

URL: http://svn.apache.org/viewvc?rev=984625&view=rev
Log:
AVRO-586.  Java: Permit specification of MapReduce output file metadata properties.  Contributed by Ken Krugler.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Aug 12 01:12:23 2010
@@ -106,6 +106,9 @@ Avro 1.4.0 (unreleased)
     AVRO-557. Java: Cache ResolvingDecoder instances, speeding
     DatumReader benchmarks by 5x to 9x.  (cutting & scotcarey)
 
+    AVRO-586. Java: Permit specification of MapReduce output file
+    metadata properties. (Ken Krugler via cutting)
+
   BUG FIXES
 
     AVRO-502. Memory leak from parsing JSON schema.

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Thu Aug 12 01:12:23 2010
@@ -19,6 +19,8 @@
 package org.apache.avro.mapred;
 
 import java.util.Collection;
+import java.net.URLEncoder;
+import java.io.UnsupportedEncodingException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
@@ -38,6 +40,10 @@ public class AvroJob {
   public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
   /** The configuration key for a job's output schema. */
   public static final String OUTPUT_SCHEMA = "avro.output.schema";
+  /** The configuration key prefix for a text output metadata. */
+  public static final String TEXT_PREFIX = "avro.meta.text.";
+  /** The configuration key prefix for a binary output metadata. */
+  public static final String BINARY_PREFIX = "avro.meta.binary.";
 
   /** Configure a job's map input schema. */
   public static void setInputSchema(JobConf job, Schema s) {
@@ -70,6 +76,25 @@ public class AvroJob {
     configureAvroJob(job);
   }
 
+  /** Add metadata to job output files.*/
+  public static void setOutputMeta(JobConf job, String key, String value) {
+    job.set(TEXT_PREFIX+key, value);
+  }
+  /** Add metadata to job output files.*/
+  public static void setOutputMeta(JobConf job, String key, long value) {
+    job.set(TEXT_PREFIX+key, Long.toString(value));
+  }
+  /** Add metadata to job output files.*/
+  public static void setOutputMeta(JobConf job, String key, byte[] value) {
+    try {
+      job.set(BINARY_PREFIX+key,
+              URLEncoder.encode(new String(value, "ISO-8859-1"),
+                                "ISO-8859-1"));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /** Return a job's output key schema. */
   public static Schema getOutputSchema(Configuration job) {
     return Schema.parse(job.get(OUTPUT_SCHEMA));

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java Thu Aug 12 01:12:23 2010
@@ -19,6 +19,8 @@
 package org.apache.avro.mapred;
 
 import java.io.IOException;
+import java.util.Map;
+import java.net.URLDecoder;
 
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,6 +73,17 @@ public class AvroOutputFormat <T>
       writer.setCodec(CodecFactory.deflateCodec(level));
     }
 
+    // copy metadata from job
+    for (Map.Entry<String,String> e : job) {
+      if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
+        writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
+                       e.getValue());
+      if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
+        writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
+                       URLDecoder.decode(e.getValue(), "ISO-8859-1")
+                       .getBytes("ISO-8859-1"));
+    }
+
     Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
     writer.create(schema, path.getFileSystem(job).create(path));
 

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCount.java Thu Aug 12 01:12:23 2010
@@ -81,6 +81,8 @@ public class TestWordCount {
     FileOutputFormat.setOutputPath(job, outputPath);
     FileOutputFormat.setCompressOutput(job, true);
     
+    WordCountUtil.setMeta(job);
+
     JobClient.runJob(job);
     
     WordCountUtil.validateCountsFile();

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=984625&r1=984624&r2=984625&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java Thu Aug 12 01:12:23 2010
@@ -26,11 +26,13 @@ import java.io.InputStream;
 import java.io.FileInputStream;
 import java.io.BufferedInputStream;
 import java.io.PrintStream;
+import java.util.Arrays;
 import java.util.StringTokenizer;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.mapred.JobConf;
 
 import org.apache.avro.Schema;
 import org.apache.avro.util.Utf8;
@@ -92,7 +94,7 @@ class WordCountUtil {
     out.close();
   }
 
-  public static void validateCountsFile() throws IOException {
+  public static void validateCountsFile() throws Exception {
     DatumReader<Pair<Utf8,Long>> reader
       = new SpecificDatumReader<Pair<Utf8,Long>>();
     InputStream in = new BufferedInputStream(new FileInputStream(COUNTS_FILE));
@@ -104,8 +106,31 @@ class WordCountUtil {
                    COUNTS.get(wc.key().toString()), wc.value());
       numWords++;
     }
+    checkMeta(counts);
     in.close();
     assertEquals(COUNTS.size(), numWords);
   }
 
+  // metadata tests
+  private static final String STRING_KEY = "string-key";
+  private static final String LONG_KEY = "long-key";
+  private static final String BYTES_KEY = "bytes-key";
+  
+  private static final String STRING_META_VALUE = "value";
+  private static final long LONG_META_VALUE = 666;
+  private static final byte[] BYTES_META_VALUE
+    = new byte[] {(byte)0x00, (byte)0x80, (byte)0xff};
+
+  public static void setMeta(JobConf job) {
+    AvroJob.setOutputMeta(job, STRING_KEY, STRING_META_VALUE);
+    AvroJob.setOutputMeta(job, LONG_KEY, LONG_META_VALUE);
+    AvroJob.setOutputMeta(job, BYTES_KEY, BYTES_META_VALUE);
+  }
+
+  public static void checkMeta(DataFileStream<?> in) throws Exception {
+    assertEquals(STRING_META_VALUE, in.getMetaString(STRING_KEY));
+    assertEquals(LONG_META_VALUE, in.getMetaLong(LONG_KEY));
+    assertTrue(Arrays.equals(BYTES_META_VALUE, in.getMeta(BYTES_KEY)));
+  }
+
 }