You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/05/30 10:27:55 UTC

svn commit: r1129053 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/

Author: cutting
Date: Mon May 30 08:27:54 2011
New Revision: 1129053

URL: http://svn.apache.org/viewvc?rev=1129053&view=rev
Log:
AVRO-830.  Java: Add AvroTextOutputFormat to permit Hadoop streaming jobs to easily write Avro format output with bytes as schema.  Contributed by Tom White.

Added:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java
      - copied, changed from r1129032, avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Removed:
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1129053&r1=1129052&r2=1129053&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon May 30 08:27:54 2011
@@ -18,6 +18,10 @@ Avro 1.5.2 (unreleased)
 
   NEW FEATURES
 
+    AVRO-830. Java: Add AvroTextOutputFormat to permit Hadoop
+    streaming jobs to easily write Avro format output with "bytes" as
+    schema.  (Tom White via cutting)
+
   IMPROVEMENTS
 
     AVRO-820. Java: Permit applications to catch exceptions thrown

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1129053&r1=1129052&r2=1129053&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java Mon May 30 08:27:54 2011
@@ -19,6 +19,7 @@
 package org.apache.avro.mapred;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.Map;
 import java.net.URLDecoder;
 
@@ -64,21 +65,9 @@ public class AvroOutputFormat <T>
   public static void setSyncInterval(JobConf job, int syncIntervalInBytes) {
     job.setInt(SYNC_INTERVAL_KEY, syncIntervalInBytes);
   }
-
-  @Override
-  public RecordWriter<AvroWrapper<T>, NullWritable>
-    getRecordWriter(FileSystem ignore, JobConf job,
-                    String name, Progressable prog)
-    throws IOException {
-
-    boolean isMapOnly = job.getNumReduceTasks() == 0;
-    Schema schema = isMapOnly
-      ? AvroJob.getMapOutputSchema(job)
-      : AvroJob.getOutputSchema(job);
-
-    final DataFileWriter<T> writer =
-      new DataFileWriter<T>(new ReflectDatumWriter<T>());
-
+  
+  static <T> void configureDataFileWriter(DataFileWriter<T> writer,
+      JobConf job) throws UnsupportedEncodingException {
     if (FileOutputFormat.getCompressOutput(job)) {
       int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
       String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
@@ -87,7 +76,7 @@ public class AvroOutputFormat <T>
         : CodecFactory.fromString(codecName);
       writer.setCodec(factory);
     }
-
+    
     writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
 
     // copy metadata from job
@@ -100,6 +89,23 @@ public class AvroOutputFormat <T>
                        URLDecoder.decode(e.getValue(), "ISO-8859-1")
                        .getBytes("ISO-8859-1"));
     }
+  }
+
+  @Override
+  public RecordWriter<AvroWrapper<T>, NullWritable>
+    getRecordWriter(FileSystem ignore, JobConf job,
+                    String name, Progressable prog)
+    throws IOException {
+
+    boolean isMapOnly = job.getNumReduceTasks() == 0;
+    Schema schema = isMapOnly
+      ? AvroJob.getMapOutputSchema(job)
+      : AvroJob.getOutputSchema(job);
+
+    final DataFileWriter<T> writer =
+      new DataFileWriter<T>(new ReflectDatumWriter<T>());
+    
+    configureDataFileWriter(writer, job);
 
     Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
     writer.create(schema, path.getFileSystem(job).create(path));

Added: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java?rev=1129053&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java Mon May 30 08:27:54 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import static org.apache.avro.mapred.AvroOutputFormat.EXT;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/** The equivalent of {@link org.apache.hadoop.mapred.TextOutputFormat} for
+ * writing to Avro Data Files with a <code>"bytes"</code> schema. */
+public class AvroTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  private static final String UTF8 = "UTF-8";
+
+  @Override
+  public RecordWriter<K, V>
+    getRecordWriter(FileSystem ignore, JobConf job,
+                    String name, Progressable prog)
+    throws IOException {
+
+    Schema schema = Schema.create(Schema.Type.BYTES);
+    
+    final byte[] keyValueSeparator =
+      job.get("mapreduce.output.textoutputformat.separator", "\t").getBytes(UTF8);
+
+    final DataFileWriter<ByteBuffer> writer =
+      new DataFileWriter<ByteBuffer>(new ReflectDatumWriter<ByteBuffer>());
+
+    AvroOutputFormat.configureDataFileWriter(writer, job);
+
+    Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
+    writer.create(schema, path.getFileSystem(job).create(path));
+
+    return new AvroTextRecordWriter(writer, keyValueSeparator);
+  }
+  
+  class AvroTextRecordWriter implements RecordWriter<K, V> {
+    private final DataFileWriter<ByteBuffer> writer;
+    private final byte[] keyValueSeparator;
+    
+    public AvroTextRecordWriter(DataFileWriter<ByteBuffer> writer,
+        byte[] keyValueSeparator) {
+      this.writer = writer;
+      this.keyValueSeparator = keyValueSeparator;
+    }
+    
+    public void write(K key, V value) throws IOException {
+      boolean nullKey = key == null || key instanceof NullWritable;
+      boolean nullValue = value == null || value instanceof NullWritable;
+      if (nullKey && nullValue) {
+        return;
+      } else if (!nullKey && nullValue) {
+        writer.append(toByteBuffer(key));
+      } else if (nullKey && !nullValue) {
+        writer.append(toByteBuffer(value));
+      } else {
+        writer.append(toByteBuffer(key, keyValueSeparator, value));
+      }
+    }
+    
+    public void close(Reporter reporter) throws IOException {
+      writer.close();
+    }
+    
+    private ByteBuffer toByteBuffer(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        return ByteBuffer.wrap(to.getBytes(), 0, to.getLength());
+      } else {
+        return ByteBuffer.wrap(o.toString().getBytes(UTF8));
+      }
+    }
+    
+    private ByteBuffer toByteBuffer(Object key, byte[] sep, Object value)
+        throws IOException {
+      byte[] keyBytes, valBytes;
+      int keyLength, valLength;
+      if (key instanceof Text) {
+        Text tkey = (Text) key;
+        keyBytes = tkey.getBytes();
+        keyLength = tkey.getLength();
+      } else {
+        keyBytes = key.toString().getBytes(UTF8);
+        keyLength = keyBytes.length;
+      }
+      if (value instanceof Text) {
+        Text tval = (Text) value;
+        valBytes = tval.getBytes();
+        valLength = tval.getLength();
+      } else {
+        valBytes = value.toString().getBytes(UTF8);
+        valLength = valBytes.length;
+      }
+      ByteBuffer buf = ByteBuffer.allocate(keyLength + sep.length + valLength);
+      buf.put(keyBytes, 0, keyLength);
+      buf.put(sep);
+      buf.put(valBytes, 0, valLength);
+      buf.rewind();
+      return buf;
+    }
+
+  }
+
+}

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java?rev=1129053&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java Mon May 30 08:27:54 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.junit.Test;
+
+public class TestAvroTextOutputFormat {
+  
+  private static final String UTF8 = "UTF-8";
+
+  @Test
+  public void testAvroTextRecordWriter() throws Exception {
+    File file = new File(System.getProperty("test.dir", "."), "writer");
+    Schema schema = Schema.create(Schema.Type.BYTES);
+    DatumWriter<ByteBuffer> datumWriter =
+      new GenericDatumWriter<ByteBuffer>(schema);
+    DataFileWriter<ByteBuffer> fileWriter =
+      new DataFileWriter<ByteBuffer>(datumWriter);
+    fileWriter.create(schema, file);
+    RecordWriter<Object, Object> rw = new AvroTextOutputFormat<Object, Object>()
+      .new AvroTextRecordWriter(fileWriter, "\t".getBytes(UTF8));
+    
+    rw.write(null, null);
+    rw.write(null, NullWritable.get());
+    rw.write(NullWritable.get(), null);
+    rw.write(NullWritable.get(), NullWritable.get());
+    
+    rw.write("k1", null);
+    rw.write("k2", NullWritable.get());
+
+    rw.write(null, "v1");
+    rw.write(NullWritable.get(), "v2");
+
+    rw.write("k3", "v3");
+    rw.write(new Text("k4"), new Text("v4"));
+    
+    rw.close(null);
+
+    DatumReader<ByteBuffer> reader = new GenericDatumReader<ByteBuffer>();
+    DataFileReader<ByteBuffer> fileReader =
+      new DataFileReader<ByteBuffer>(file, reader);
+    assertEquals("k1", asString(fileReader.next()));
+    assertEquals("k2", asString(fileReader.next()));
+    assertEquals("v1", asString(fileReader.next()));
+    assertEquals("v2", asString(fileReader.next()));
+    assertEquals("k3\tv3", asString(fileReader.next()));
+    assertEquals("k4\tv4", asString(fileReader.next()));
+    assertFalse("End", fileReader.hasNext());
+  }
+  
+  private String asString(ByteBuffer buf) throws UnsupportedEncodingException {
+    byte[] b = new byte[buf.remaining()];
+    buf.get(b);
+    return new String(b, UTF8);
+  }
+
+}

Copied: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java (from r1129032, avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java)
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java?p2=avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java&p1=avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java&r1=1129032&r2=1129053&rev=1129053&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java Mon May 30 08:27:54 2011
@@ -18,26 +18,20 @@
 
 package org.apache.avro.mapred;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
 
-public class TestAvroAsTextInputFormat {
+public class TestAvroTextSort {
   
   @Test
   /**
    * Run the identity job on a "bytes" Avro file using AvroAsTextInputFormat
-   * and check the output is a sorted text file.
+   * and AvroTextOutputFormat to produce a sorted "bytes" Avro file.
    */
   public void testSort() throws Exception {
     JobConf job = new JobConf();
@@ -48,6 +42,7 @@ public class TestAvroAsTextInputFormat {
     WordCountUtil.writeLinesBytesFile();
     
     job.setInputFormat(AvroAsTextInputFormat.class);
+    job.setOutputFormat(AvroTextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     
     FileInputFormat.setInputPaths(job, new Path(dir + "/in"));

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1129053&r1=1129052&r2=1129053&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java Mon May 30 08:27:54 2011
@@ -44,6 +44,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.file.DataFileWriter;
@@ -60,7 +61,7 @@ class WordCountUtil {
   private static final File COUNTS_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");
   private static final File SORTED_FILE
-    = new File(new File(DIR, "out"), "part-00000");
+    = new File(new File(DIR, "out"), "part-00000.avro");
 
   public static final String[] LINES = new String[] {
     "the quick brown fox jumps over the lazy dog",
@@ -131,18 +132,25 @@ class WordCountUtil {
   }
   
   public static void validateSortedFile() throws Exception {
-    BufferedReader reader = new BufferedReader(new FileReader(SORTED_FILE));
+    DatumReader<ByteBuffer> reader = new GenericDatumReader<ByteBuffer>();
+    InputStream in = new BufferedInputStream(
+        new FileInputStream(SORTED_FILE));
+    DataFileStream<ByteBuffer> lines =
+        new DataFileStream<ByteBuffer>(in,reader);
     List<String> sortedLines = new ArrayList<String>();
     for (String line : LINES) {
       sortedLines.add(line);
     }
     Collections.sort(sortedLines);
     for (String expectedLine : sortedLines) {
-      assertEquals(expectedLine, reader.readLine().trim());
+      ByteBuffer buf = lines.next();
+      byte[] b = new byte[buf.remaining()];
+      buf.get(b);
+      assertEquals(expectedLine, new String(b, "UTF-8").trim());
     }
-    assertNull(reader.readLine());
+    assertFalse(lines.hasNext());
   }
-
+  
   // metadata tests
   private static final String STRING_KEY = "string-key";
   private static final String LONG_KEY = "long-key";