You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/18 19:27:49 UTC
svn commit: r1447419 - in /avro/trunk: CHANGES.txt
lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
Author: cutting
Date: Mon Feb 18 18:27:49 2013
New Revision: 1447419
URL: http://svn.apache.org/r1447419
Log:
AVRO-1215. Java: Fix AvroMultipleOutputs when specifying baseOutputPath. Contributed by Ashish Nagavaram.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1447419&r1=1447418&r2=1447419&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Feb 18 18:27:49 2013
@@ -82,6 +82,9 @@ Trunk (not yet released)
AVRO-1247. Java: Fix Requestor and Responder implementations to
use correct ClassLoader. (cutting)
+ AVRO-1215. Java: Fix AvroMultipleOutputs when specifying baseOutputPath.
+ (Ashish Nagavaram via cutting)
+
Avro 1.7.3 (6 December 2012)
NEW FEATURES
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1447419&r1=1447418&r2=1447419&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java Mon Feb 18 18:27:49 2013
@@ -144,16 +144,6 @@ public class AvroMultipleOutputs{
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
/**
- * Cache for the Key Schemas
- */
- private static Map<String, Schema> keySchemas = new HashMap<String, Schema>();
-
- /**
- * Cache for the Value Schemas
- */
- private static Map<String, Schema> valSchemas = new HashMap<String, Schema>();
-
- /**
* Checks if a named output name is valid token.
*
* @param namedOutput named output Name
@@ -272,9 +262,10 @@ public class AvroMultipleOutputs{
conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
OutputFormat.class);
- keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema);
- valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema);
-
+ conf.set(MO_PREFIX+namedOutput+".keyschema", keySchema.toString());
+ if(valueSchema!=null){
+ conf.set(MO_PREFIX+namedOutput+".valueschema",valueSchema.toString());
+ }
}
/**
@@ -420,15 +411,35 @@ public class AvroMultipleOutputs{
* @param baseOutputPath base-output path to write the record to.
* Note: Framework will generate unique filename for the baseOutputPath
*/
- @SuppressWarnings("unchecked")
public void write(Object key, Object value, String baseOutputPath)
throws IOException, InterruptedException {
+ write(key, value, null, null, baseOutputPath);
+ }
+
+ /**
+ * Write key value to an output file name.
+ *
+ * Gets the record writer from job's output format. Job's output format should
+ * be a FileOutputFormat.
+ *
+ * @param key the key
+ * @param value the value
+ * @param keySchema keySchema to use
+ * @param valSchema ValueSchema to use
+ * @param baseOutputPath base-output path to write the record to. Note: Framework will
+ * generate unique filename for the baseOutputPath
+ */
+ @SuppressWarnings("unchecked")
+ public void write(Object key, Object value, Schema keySchema,
+ Schema valSchema, String baseOutputPath) throws IOException,
+ InterruptedException {
checkBaseOutputPath(baseOutputPath);
- TaskAttemptContext taskContext = createTaskAttemptContext(
- context.getConfiguration(), context.getTaskAttemptID());
+ Job job = new Job(context.getConfiguration());
+ setSchema(job, keySchema, valSchema);
+ TaskAttemptContext taskContext = createTaskAttemptContext(job.getConfiguration(), context.getTaskAttemptID());
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
-
+
// by being synchronized MultipleOutputTask can be use with a
// MultithreadedMapper.
@SuppressWarnings("unchecked")
@@ -443,6 +454,7 @@ public class AvroMultipleOutputs{
if (writer == null) {
// get the record writer from context output format
//FileOutputFormat.setOutputName(taskContext, baseFileName);
+ taskContext.getConfiguration().set("avro.mo.config.namedOutput",baseFileName);
try {
writer = ((OutputFormat) ReflectionUtils.newInstance(
taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
@@ -463,8 +475,27 @@ public class AvroMultipleOutputs{
return writer;
}
+ private void setSchema(Job job, Schema keySchema, Schema valSchema) {
+
+ boolean isMaponly = job.getNumReduceTasks() == 0;
+ if (keySchema != null) {
+ if (isMaponly)
+ AvroJob.setMapOutputKeySchema(job, keySchema);
+ else
+ AvroJob.setOutputKeySchema(job, keySchema);
+ }
+ if (valSchema != null) {
+ if (isMaponly)
+ AvroJob.setMapOutputValueSchema(job, valSchema);
+ else
+ AvroJob.setOutputValueSchema(job, valSchema);
+ }
+
+ }
+
// Create a taskAttemptContext for the named output with
// output format and output key/value types put in the context
+ @SuppressWarnings("deprecation")
private TaskAttemptContext getContext(String nameOutput) throws IOException {
TaskAttemptContext taskContext = taskContexts.get(nameOutput);
@@ -475,28 +506,17 @@ public class AvroMultipleOutputs{
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
- context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput);
Job job = new Job(context.getConfiguration());
job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
- Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA");
- Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA");
-
- boolean isMaponly=job.getNumReduceTasks() == 0;
-
- if(keySchema!=null)
- {
- if(isMaponly)
- AvroJob.setMapOutputKeySchema(job,keySchema);
- else
- AvroJob.setOutputKeySchema(job,keySchema);
- }
- if(valSchema!=null)
- {
- if(isMaponly)
- AvroJob.setMapOutputValueSchema(job,valSchema);
- else
- AvroJob.setOutputValueSchema(job,valSchema);
- }
+ Schema keySchema=null,valSchema=null;
+ if (job.getConfiguration().get(MO_PREFIX + nameOutput + ".keyschema",null) != null)
+ keySchema = Schema.parse(job.getConfiguration().get(
+ MO_PREFIX + nameOutput + ".keyschema"));
+ if (job.getConfiguration().get(MO_PREFIX + nameOutput + ".valueschema",
+ null) != null)
+ valSchema = Schema.parse(job.getConfiguration().get(
+ MO_PREFIX + nameOutput + ".valueschema"));
+ setSchema(job, keySchema, valSchema);
taskContext = createTaskAttemptContext(
job.getConfiguration(), context.getTaskAttemptID());
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java?rev=1447419&r1=1447418&r2=1447419&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java Mon Feb 18 18:27:49 2013
@@ -127,7 +127,10 @@ public class TestAvroMultipleOutputs {
record2.put("name1", new Utf8(line.toString()));
record2.put("count1", new Integer(sum));
mStats.datum(record2);
+ amos.write(mStats, NullWritable.get(), STATS_SCHEMA_2, null, "testnewwrite2");
amos.write("myavro1",mStats);
+ amos.write(mStats, NullWritable.get(), STATS_SCHEMA, null, "testnewwrite");
+ amos.write(mStats, NullWritable.get(), "testwritenonschema");
}
@Override
@@ -241,6 +244,52 @@ public class TestAvroMultipleOutputs {
Assert.assertEquals(3, counts.get("apple").intValue());
Assert.assertEquals(2, counts.get("banana").intValue());
Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles = fileSystem.globStatus(outputPath.suffix("/testnewwrite-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles = fileSystem.globStatus(outputPath.suffix("/testnewwrite2-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name1")).toString(), (Integer) record.get("count1"));
+ }
+ reader.close();
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+ outputFiles = fileSystem.globStatus(outputPath.suffix("/testwritenonschema-r-00000.avro"));
+ Assert.assertEquals(1, outputFiles.length);
+ reader = new DataFileReader<GenericData.Record>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+ counts = new HashMap<String, Integer>();
+ for (GenericData.Record record : reader) {
+ counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+
+
}
@Test