You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/12/14 00:50:22 UTC
svn commit: r1550852 - in /avro/trunk: ./
lang/java/mapred/src/main/java/org/apache/avro/mapreduce/
lang/java/mapred/src/test/java/org/apache/avro/mapreduce/
Author: cutting
Date: Fri Dec 13 23:50:22 2013
New Revision: 1550852
URL: http://svn.apache.org/r1550852
Log:
AVRO-1344. Java: Expose sync interval configuration in mapreduce API. Contributed by Rob Turner.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Dec 13 23:50:22 2013
@@ -41,6 +41,9 @@ Trunk (not yet released)
AVRO-1234. Java: Permit AvroInputFormat to process files whose
names don't end in .avro. (Dave Beech & Sandy Ryza via cutting)
+ AVRO-1344. Java: Expose sync interval configuration in mapreduce API.
+ (Rob Turner via cutting)
+
BUG FIXES
AVRO-1368. Fix SpecificDatumWriter to, when writing a string
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyOutputFormat.java Fri Dec 13 23:50:22 2013
@@ -71,11 +71,12 @@ public class AvroKeyOutputFormat<T> exte
* @param writerSchema The writer schema for the records to write.
* @param compressionCodec The compression type for the writer file.
* @param outputStream The target output stream for the records.
+ * @param syncInterval The sync interval for the writer file.
*/
protected RecordWriter<AvroKey<T>, NullWritable> create(
Schema writerSchema, GenericData dataModel, CodecFactory compressionCodec,
- OutputStream outputStream) throws IOException {
- return new AvroKeyRecordWriter<T>(writerSchema, dataModel, compressionCodec, outputStream);
+ OutputStream outputStream, int syncInterval) throws IOException {
+ return new AvroKeyRecordWriter<T>(writerSchema, dataModel, compressionCodec, outputStream, syncInterval);
}
}
@@ -103,6 +104,6 @@ public class AvroKeyOutputFormat<T> exte
return mRecordWriterFactory.create
(writerSchema, dataModel, getCompressionCodec(context),
- getAvroFileOutputStream(context));
+ getAvroFileOutputStream(context), getSyncInterval(context));
}
}
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java Fri Dec 13 23:50:22 2013
@@ -23,6 +23,7 @@ import java.io.OutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroKey;
@@ -45,15 +46,30 @@ public class AvroKeyRecordWriter<T> exte
* @param writerSchema The writer schema for the records in the Avro container file.
* @param compressionCodec A compression codec factory for the Avro container file.
* @param outputStream The output stream to write the Avro container file to.
+ * @param syncInterval The sync interval for the Avro container file.
* @throws IOException If the record writer cannot be opened.
*/
public AvroKeyRecordWriter(Schema writerSchema, GenericData dataModel,
- CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+ CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException {
// Create an Avro container file and a writer to it.
mAvroFileWriter = new DataFileWriter<T>(dataModel.createDatumWriter(writerSchema));
mAvroFileWriter.setCodec(compressionCodec);
+ mAvroFileWriter.setSyncInterval(syncInterval);
mAvroFileWriter.create(writerSchema, outputStream);
}
+ /**
+ * Constructor.
+ *
+ * @param writerSchema The writer schema for the records in the Avro container file.
+ * @param compressionCodec A compression codec factory for the Avro container file.
+ * @param outputStream The output stream to write the Avro container file to.
+ * @throws IOException If the record writer cannot be opened.
+ */
+ public AvroKeyRecordWriter(Schema writerSchema, GenericData dataModel,
+ CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+ this(writerSchema, dataModel, compressionCodec, outputStream,
+ DataFileConstants.DEFAULT_SYNC_INTERVAL);
+ }
/** {@inheritDoc} */
@Override
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueOutputFormat.java Fri Dec 13 23:50:22 2013
@@ -60,6 +60,7 @@ public class AvroKeyValueOutputFormat<K,
GenericData dataModel = AvroSerialization.createDataModel(conf);
return new AvroKeyValueRecordWriter<K, V>(keyConverter, valueConverter,
- dataModel, getCompressionCodec(context), getAvroFileOutputStream(context));
+ dataModel, getCompressionCodec(context), getAvroFileOutputStream(context),
+ getSyncInterval(context));
}
}
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java Fri Dec 13 23:50:22 2013
@@ -25,6 +25,7 @@ import org.apache.avro.Schema;
import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -59,9 +60,20 @@ public class AvroKeyValueRecordWriter<K,
/** A helper object that converts the input value to an Avro datum. */
private final AvroDatumConverter<V, ?> mValueConverter;
+ /**
+ * Constructor.
+ *
+ * @param keyConverter A key to Avro datum converter.
+ * @param valueConverter A value to Avro datum converter.
+ * @param dataModel The data model for key and value.
+ * @param compressionCodec A compression codec factory for the Avro container file.
+ * @param outputStream The output stream to write the Avro container file to.
+ * @param syncInterval The sync interval for the Avro container file.
+ * @throws IOException If the record writer cannot be opened.
+ */
public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
AvroDatumConverter<V, ?> valueConverter, GenericData dataModel,
- CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+ CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException {
// Create the generic record schema for the key/value pair.
mKeyValuePairSchema = AvroKeyValue.getSchema(
keyConverter.getWriterSchema(), valueConverter.getWriterSchema());
@@ -70,6 +82,7 @@ public class AvroKeyValueRecordWriter<K,
mAvroFileWriter = new DataFileWriter<GenericRecord>(
dataModel.createDatumWriter(mKeyValuePairSchema));
mAvroFileWriter.setCodec(compressionCodec);
+ mAvroFileWriter.setSyncInterval(syncInterval);
mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
// Keep a reference to the converters.
@@ -81,6 +94,23 @@ public class AvroKeyValueRecordWriter<K,
}
/**
+ * Constructor.
+ *
+ * @param keyConverter A key to Avro datum converter.
+ * @param valueConverter A value to Avro datum converter.
+ * @param dataModel The data model for key and value.
+ * @param compressionCodec A compression codec factory for the Avro container file.
+ * @param outputStream The output stream to write the Avro container file to.
+ * @throws IOException If the record writer cannot be opened.
+ */
+ public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
+ AvroDatumConverter<V, ?> valueConverter, GenericData dataModel,
+ CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
+ this(keyConverter, valueConverter, dataModel, compressionCodec, outputStream,
+ DataFileConstants.DEFAULT_SYNC_INTERVAL);
+ }
+
+ /**
* Gets the writer schema for the key/value pair generic record.
*
* @return The writer schema used for entries of the Avro container file.
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java Fri Dec 13 23:50:22 2013
@@ -90,6 +90,17 @@ public abstract class AvroOutputFormatBa
getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
return path.getFileSystem(context.getConfiguration()).create(path);
}
-
-
+
+ /**
+ * Gets the configured sync interval from the task context.
+ *
+ * @param context The task attempt context.
+ * @return The sync interval to use for the output Avro container file.
+ */
+ protected static int getSyncInterval(TaskAttemptContext context) {
+ return context.getConfiguration().getInt(
+ org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY,
+ DataFileConstants.DEFAULT_SYNC_INTERVAL);
+ }
+
}
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1550852&r1=1550851&r2=1550852&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Fri Dec 13 23:50:22 2013
@@ -43,13 +43,17 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestAvroKeyOutputFormat {
+ private static final String SYNC_INTERVAL_KEY = org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
+ private static final int TEST_SYNC_INTERVAL = 12345;
+
@Rule
public TemporaryFolder mTempDir = new TemporaryFolder();
@Test
public void testWithNullCodec() throws IOException {
Configuration conf = new Configuration();
- testGetRecordWriter(conf, CodecFactory.nullCodec());
+ conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+ testGetRecordWriter(conf, CodecFactory.nullCodec(), TEST_SYNC_INTERVAL);
}
@Test
@@ -57,7 +61,7 @@ public class TestAvroKeyOutputFormat {
Configuration conf = new Configuration();
conf.setBoolean("mapred.output.compress", true);
conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, 3);
- testGetRecordWriter(conf, CodecFactory.deflateCodec(3));
+ testGetRecordWriter(conf, CodecFactory.deflateCodec(3), DataFileConstants.DEFAULT_SYNC_INTERVAL);
}
@Test
@@ -65,7 +69,8 @@ public class TestAvroKeyOutputFormat {
Configuration conf = new Configuration();
conf.setBoolean("mapred.output.compress", true);
conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
- testGetRecordWriter(conf, CodecFactory.snappyCodec());
+ conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+ testGetRecordWriter(conf, CodecFactory.snappyCodec(), TEST_SYNC_INTERVAL);
}
@Test
@@ -73,7 +78,7 @@ public class TestAvroKeyOutputFormat {
Configuration conf = new Configuration();
conf.setBoolean("mapred.output.compress", true);
conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.BZIP2_CODEC);
- testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+ testGetRecordWriter(conf, CodecFactory.bzip2Codec(), DataFileConstants.DEFAULT_SYNC_INTERVAL);
}
@Test
@@ -82,7 +87,8 @@ public class TestAvroKeyOutputFormat {
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.DeflateCodec");
conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, -1);
- testGetRecordWriter(conf, CodecFactory.deflateCodec(-1));
+ conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+ testGetRecordWriter(conf, CodecFactory.deflateCodec(-1), TEST_SYNC_INTERVAL);
}
@Test
@@ -90,7 +96,7 @@ public class TestAvroKeyOutputFormat {
Configuration conf = new Configuration();
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
- testGetRecordWriter(conf, CodecFactory.snappyCodec());
+ testGetRecordWriter(conf, CodecFactory.snappyCodec(), DataFileConstants.DEFAULT_SYNC_INTERVAL);
}
@Test
@@ -98,13 +104,14 @@ public class TestAvroKeyOutputFormat {
Configuration conf = new Configuration();
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.BZip2Codec");
- testGetRecordWriter(conf, CodecFactory.bzip2Codec());
+ conf.setInt(SYNC_INTERVAL_KEY, TEST_SYNC_INTERVAL);
+ testGetRecordWriter(conf, CodecFactory.bzip2Codec(), TEST_SYNC_INTERVAL);
}
/**
* Tests that the record writer is constructed and returned correctly from the output format.
*/
- private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec)
+ private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec, int expectedSyncInterval)
throws IOException {
// Configure a mock task attempt context.
Job job = new Job(conf);
@@ -131,7 +138,8 @@ public class TestAvroKeyOutputFormat {
expect(recordWriterFactory.create(eq(writerSchema),
anyObject(GenericData.class),
capture(capturedCodecFactory), // Capture for comparison later.
- anyObject(OutputStream.class))).andReturn(expectedRecordWriter);
+ anyObject(OutputStream.class),
+ eq(expectedSyncInterval))).andReturn(expectedRecordWriter);
replay(context);
replay(expectedRecordWriter);