You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/12/14 00:50:22 UTC

svn commit: r1550852 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapreduce/ lang/java/mapred/src/test/java/org/apache/avro/mapreduce/

Author: cutting
Date: Fri Dec 13 23:50:22 2013
New Revision: 1550852

URL: http://svn.apache.org/r1550852
Log:
AVRO-1344. Java: Expose sync interval configuration in mapreduce API.  Contributed by Rob Turner.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Dec 13 23:50:22 2013
@@ -41,6 +41,9 @@ Trunk (not yet released)
     AVRO-1234. Java: Permit AvroInputFormat to process files whose
     names don't end in .avro.  (Dave Beech & Sandy Ryza via cutting)
 
+    AVRO-1344. Java: Expose sync interval configuration in mapreduce API.
+    (Rob Turner via cutting)
+
   BUG FIXES
 
     AVRO-1368. Fix SpecificDatumWriter to, when writing a string

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java Fri Dec 13 23:50:22 2013
@@ -71,11 +71,12 @@ public class AvroKeyOutputFormat<T> exte
      * @param writerSchema The writer schema for the records to write.
      * @param compressionCodec The compression type for the writer file.
      * @param outputStream The target output stream for the records.
+     * @param syncInterval The sync interval for the writer file.
      */
     protected RecordWriter<AvroKey<T>, NullWritable> create(
         Schema writerSchema, GenericData dataModel, CodecFactory compressionCodec,
-        OutputStream outputStream) throws IOException {
-      return new AvroKeyRecordWriter<T>(writerSchema, dataModel, compressionCodec, outputStream);
+        OutputStream outputStream, int syncInterval) throws IOException {
+      return new AvroKeyRecordWriter<T>(writerSchema, dataModel, compressionCodec, outputStream, syncInterval);
     }
   }
 
@@ -103,6 +104,6 @@ public class AvroKeyOutputFormat<T> exte
 
     return mRecordWriterFactory.create
       (writerSchema, dataModel, getCompressionCodec(context),
-       getAvroFileOutputStream(context));
+       getAvroFileOutputStream(context), getSyncInterval(context));
   }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java Fri Dec 13 23:50:22 2013
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.mapred.AvroKey;
@@ -45,15 +46,30 @@ public class AvroKeyRecordWriter<T> exte
    * @param writerSchema The writer schema for the records in the Avro container file.
    * @param compressionCodec A compression codec factory for the Avro container file.
    * @param outputStream The output stream to write the Avro container file to.
+   * @param syncInterval The sync interval for the Avro container file.
    * @throws IOException If the record writer cannot be opened.
    */
   public AvroKeyRecordWriter(Schema writerSchema, GenericData dataModel,
-      CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+      CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException {
     // Create an Avro container file and a writer to it.
     mAvroFileWriter = new DataFileWriter<T>(dataModel.createDatumWriter(writerSchema));
     mAvroFileWriter.setCodec(compressionCodec);
+    mAvroFileWriter.setSyncInterval(syncInterval);
     mAvroFileWriter.create(writerSchema, outputStream);
   }
+  /**
+   * Constructor.
+   *
+   * @param writerSchema The writer schema for the records in the Avro container file.
+   * @param compressionCodec A compression codec factory for the Avro container file.
+   * @param outputStream The output stream to write the Avro container file to.
+   * @throws IOException If the record writer cannot be opened.
+   */
+  public AvroKeyRecordWriter(Schema writerSchema, GenericData dataModel,
+      CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+    this(writerSchema, dataModel, compressionCodec, outputStream, 
+        DataFileConstants.DEFAULT_SYNC_INTERVAL);
+  }
 
   /** {@inheritDoc} */
   @Override

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java Fri Dec 13 23:50:22 2013
@@ -60,6 +60,7 @@ public class AvroKeyValueOutputFormat<K,
     GenericData dataModel = AvroSerialization.createDataModel(conf);
 
     return new AvroKeyValueRecordWriter<K, V>(keyConverter, valueConverter,
-        dataModel, getCompressionCodec(context), getAvroFileOutputStream(context));
+        dataModel, getCompressionCodec(context), getAvroFileOutputStream(context),
+        getSyncInterval(context));
   }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java Fri Dec 13 23:50:22 2013
@@ -25,6 +25,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.hadoop.io.AvroKeyValue;
 import org.apache.avro.hadoop.io.AvroDatumConverter;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -59,9 +60,20 @@ public class AvroKeyValueRecordWriter<K,
   /** A helper object that converts the input value to an Avro datum. */
   private final AvroDatumConverter<V, ?> mValueConverter;
 
+  /**
+   * Constructor.
+   *
+   * @param keyConverter A key to Avro datum converter.
+   * @param valueConverter A value to Avro datum converter.
+   * @param dataModel The data model for key and value.
+   * @param compressionCodec A compression codec factory for the Avro container file.
+   * @param outputStream The output stream to write the Avro container file to.
+   * @param syncInterval The sync interval for the Avro container file.
+   * @throws IOException If the record writer cannot be opened.
+   */
   public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
       AvroDatumConverter<V, ?> valueConverter, GenericData dataModel,
-      CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+      CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException {
     // Create the generic record schema for the key/value pair.
     mKeyValuePairSchema = AvroKeyValue.getSchema(
         keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
@@ -70,6 +82,7 @@ public class AvroKeyValueRecordWriter<K,
     mAvroFileWriter = new DataFileWriter<GenericRecord>(
         dataModel.createDatumWriter(mKeyValuePairSchema));
     mAvroFileWriter.setCodec(compressionCodec);
+    mAvroFileWriter.setSyncInterval(syncInterval);
     mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
 
     // Keep a reference to the converters.
@@ -81,6 +94,23 @@ public class AvroKeyValueRecordWriter<K,
   }
 
   /**
+   * Constructor.
+   *
+   * @param keyConverter A key to Avro datum converter.
+   * @param valueConverter A value to Avro datum converter.
+   * @param dataModel The data model for key and value.
+   * @param compressionCodec A compression codec factory for the Avro container file.
+   * @param outputStream The output stream to write the Avro container file to.
+   * @throws IOException If the record writer cannot be opened.
+   */
+  public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
+      AvroDatumConverter<V, ?> valueConverter, GenericData dataModel,
+      CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+    this(keyConverter, valueConverter, dataModel, compressionCodec, outputStream, 
+        DataFileConstants.DEFAULT_SYNC_INTERVAL);
+  }
+  
+  /**
    * Gets the writer schema for the key/value pair generic record.
    *
    * @return The writer schema used for entries of the Avro container file.

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java Fri Dec 13 23:50:22 2013
@@ -90,6 +90,17 @@ public abstract class AvroOutputFormatBa
       getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
     return path.getFileSystem(context.getConfiguration()).create(path);
   }
- 
-   
+
+  /**
+   * Gets the configured sync interval from the task context.
+   *
+   * @param context The task attempt context.
+   * @return The sync interval to use for the output Avro container file.
+   */
+  protected static int getSyncInterval(TaskAttemptContext context) {
+    return context.getConfiguration().getInt(
+          org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY,
+          DataFileConstants.DEFAULT_SYNC_INTERVAL);
+  }
+
 }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Fri Dec 13 23:50:22 2013
@@ -43,13 +43,17 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 public class TestAvroKeyOutputFormat {
+  private static final String SYNC_INTERVAL_KEY = org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
+  private static final int TEST_SYNC_INTERVAL = 12345;
+
   @Rule
   public TemporaryFolder mTempDir = new TemporaryFolder();
 
   @Test
   public void testWithNullCodec() throws IOException {
     Configuration conf = new Configuration();
-    testGetRecordWriter(conf, CodecFactory.nullCodec());
+    conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+    testGetRecordWriter(conf, CodecFactory.nullCodec(), TEST_SYNC_INTERVAL);
   }
 
   @Test
@@ -57,7 +61,7 @@ public class TestAvroKeyOutputFormat {
     Configuration conf = new Configuration();
     conf.setBoolean("mapred.output.compress", true);
     conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, 3);
-    testGetRecordWriter(conf, CodecFactory.deflateCodec(3));
+    testGetRecordWriter(conf, CodecFactory.deflateCodec(3), DataFileConstants.DEFAULT_SYNC_INTERVAL);
   }
 
   @Test
@@ -65,7 +69,8 @@ public class TestAvroKeyOutputFormat {
     Configuration conf = new Configuration();
     conf.setBoolean("mapred.output.compress", true);
     conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
-    testGetRecordWriter(conf, CodecFactory.snappyCodec());
+    conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+    testGetRecordWriter(conf, CodecFactory.snappyCodec(), TEST_SYNC_INTERVAL);
   }
 
   @Test
@@ -73,7 +78,7 @@ public class TestAvroKeyOutputFormat {
     Configuration conf = new Configuration();
     conf.setBoolean("mapred.output.compress", true);
     conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.BZIP2_CODEC);
-    testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+    testGetRecordWriter(conf, CodecFactory.bzip2Codec(), DataFileConstants.DEFAULT_SYNC_INTERVAL);
   }
 
   @Test
@@ -82,7 +87,8 @@ public class TestAvroKeyOutputFormat {
     conf.setBoolean("mapred.output.compress", true);
     conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.DeflateCodec");
     conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, -1);
-    testGetRecordWriter(conf, CodecFactory.deflateCodec(-1));
+    conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+    testGetRecordWriter(conf, CodecFactory.deflateCodec(-1), TEST_SYNC_INTERVAL);
   }
 
   @Test
@@ -90,7 +96,7 @@ public class TestAvroKeyOutputFormat {
     Configuration conf = new Configuration();
     conf.setBoolean("mapred.output.compress", true);
     conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
-    testGetRecordWriter(conf, CodecFactory.snappyCodec());
+    testGetRecordWriter(conf, CodecFactory.snappyCodec(), DataFileConstants.DEFAULT_SYNC_INTERVAL);
   }
 
   @Test
@@ -98,13 +104,14 @@ public class TestAvroKeyOutputFormat {
     Configuration conf = new Configuration();
     conf.setBoolean("mapred.output.compress", true);
     conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.BZip2Codec");
-    testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+    conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+    testGetRecordWriter(conf, CodecFactory.bzip2Codec(), TEST_SYNC_INTERVAL);
   }
 
   /**
    * Tests that the record writer is constructed and returned correctly from the output format.
    */
-  private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec)
+  private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec, int expectedSyncInterval)
       throws IOException {
     // Configure a mock task attempt context.
     Job job = new Job(conf);
@@ -131,7 +138,8 @@ public class TestAvroKeyOutputFormat {
     expect(recordWriterFactory.create(eq(writerSchema),
         anyObject(GenericData.class),
         capture(capturedCodecFactory),  // Capture for comparison later.
-        anyObject(OutputStream.class))).andReturn(expectedRecordWriter);
+        anyObject(OutputStream.class),
+        eq(expectedSyncInterval))).andReturn(expectedRecordWriter);
 
     replay(context);
     replay(expectedRecordWriter);