You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/18 19:27:49 UTC

svn commit: r1447419 - in /avro/trunk: CHANGES.txt lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java

Author: cutting
Date: Mon Feb 18 18:27:49 2013
New Revision: 1447419

URL: http://svn.apache.org/r1447419
Log:
AVRO-1215. Java: Fix AvroMultipleOutputs when specifying baseOutputPath.  Contributed by Ashish Nagavaram.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1447419&r1=1447418&r2=1447419&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Feb 18 18:27:49 2013
@@ -82,6 +82,9 @@ Trunk (not yet released)
     AVRO-1247. Java: Fix Requestor and Responder implementations to
     use correct ClassLoader. (cutting)
 
+    AVRO-1215. Java: Fix AvroMultipleOutputs when specifying baseOutputPath.
+    (Ashish Nagavaram via cutting)
+
 Avro 1.7.3 (6 December 2012)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1447419&r1=1447418&r2=1447419&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java Mon Feb 18 18:27:49 2013
@@ -144,16 +144,6 @@ public class AvroMultipleOutputs{
   private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
 
   /**
-   * Cache for the Key Schemas
-   */
-  private static Map<String, Schema> keySchemas = new HashMap<String, Schema>();
-
-  /**
-   * Cache for the Value Schemas
-   */
-  private static Map<String, Schema> valSchemas = new HashMap<String, Schema>();
-
-  /**
    * Checks if a named output name is valid token.
    *
    * @param namedOutput named output Name
@@ -272,9 +262,10 @@ public class AvroMultipleOutputs{
       conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
     conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
       OutputFormat.class);
-    keySchemas.put(namedOutput+"_KEYSCHEMA",keySchema);
-    valSchemas.put(namedOutput+"_VALSCHEMA",valueSchema);
-  
+    conf.set(MO_PREFIX+namedOutput+".keyschema", keySchema.toString());
+    if(valueSchema!=null){
+      conf.set(MO_PREFIX+namedOutput+".valueschema",valueSchema.toString());
+    }
   }
 
   /**
@@ -420,15 +411,35 @@ public class AvroMultipleOutputs{
    * @param baseOutputPath base-output path to write the record to.
    * Note: Framework will generate unique filename for the baseOutputPath
    */
-  @SuppressWarnings("unchecked")
   public void write(Object key, Object value, String baseOutputPath) 
       throws IOException, InterruptedException {
+        write(key, value, null, null, baseOutputPath);
+  }
+  
+  /**
+   * Write key value to an output file name.
+   * 
+   * Gets the record writer from job's output format. Job's output format should
+   * be a FileOutputFormat.
+   * 
+   * @param key   the key
+   * @param value the value
+   * @param keySchema   keySchema to use
+   * @param valSchema   ValueSchema to use
+   * @param baseOutputPath base-output path to write the record to. Note: Framework will
+   *          generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public void write(Object key, Object value, Schema keySchema,
+      Schema valSchema, String baseOutputPath) throws IOException,
+      InterruptedException {
     checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = createTaskAttemptContext(
-      context.getConfiguration(), context.getTaskAttemptID());
+    Job job = new Job(context.getConfiguration());
+    setSchema(job, keySchema, valSchema);
+    TaskAttemptContext taskContext = createTaskAttemptContext(job.getConfiguration(), context.getTaskAttemptID());
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
   }
-
+    
   // by being synchronized MultipleOutputTask can be use with a
   // MultithreadedMapper.
   @SuppressWarnings("unchecked")
@@ -443,6 +454,7 @@ public class AvroMultipleOutputs{
     if (writer == null) {
       // get the record writer from context output format
       //FileOutputFormat.setOutputName(taskContext, baseFileName);
+      taskContext.getConfiguration().set("avro.mo.config.namedOutput",baseFileName);
       try {
         writer = ((OutputFormat) ReflectionUtils.newInstance(
           taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
@@ -463,8 +475,27 @@ public class AvroMultipleOutputs{
     return writer;
   }
 
+  private void setSchema(Job job, Schema keySchema, Schema valSchema) {
+
+    boolean isMaponly = job.getNumReduceTasks() == 0;
+    if (keySchema != null) {
+      if (isMaponly)
+        AvroJob.setMapOutputKeySchema(job, keySchema);
+      else
+        AvroJob.setOutputKeySchema(job, keySchema);
+    }
+    if (valSchema != null) {
+      if (isMaponly)
+        AvroJob.setMapOutputValueSchema(job, valSchema);
+      else
+        AvroJob.setOutputValueSchema(job, valSchema);
+    }
+
+  }
+
    // Create a taskAttemptContext for the named output with 
    // output format and output key/value types put in the context
+  @SuppressWarnings("deprecation")
   private TaskAttemptContext getContext(String nameOutput) throws IOException {
 
     TaskAttemptContext taskContext = taskContexts.get(nameOutput);
@@ -475,28 +506,17 @@ public class AvroMultipleOutputs{
 
     // The following trick leverages the instantiation of a record writer via
     // the job thus supporting arbitrary output formats.
-    context.getConfiguration().set("avro.mo.config.namedOutput",nameOutput);
     Job job = new Job(context.getConfiguration());
     job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
-    Schema keySchema = keySchemas.get(nameOutput+"_KEYSCHEMA");
-    Schema valSchema = valSchemas.get(nameOutput+"_VALSCHEMA");
-
-    boolean isMaponly=job.getNumReduceTasks() == 0;
-
-    if(keySchema!=null)
-    {
-      if(isMaponly)
-        AvroJob.setMapOutputKeySchema(job,keySchema);
-      else
-        AvroJob.setOutputKeySchema(job,keySchema);
-    }
-    if(valSchema!=null)
-    {
-      if(isMaponly)
-        AvroJob.setMapOutputValueSchema(job,valSchema);
-      else
-        AvroJob.setOutputValueSchema(job,valSchema);
-    }
+    Schema keySchema=null,valSchema=null;
+    if (job.getConfiguration().get(MO_PREFIX + nameOutput + ".keyschema",null) != null)
+        keySchema = Schema.parse(job.getConfiguration().get(
+            MO_PREFIX + nameOutput + ".keyschema"));
+      if (job.getConfiguration().get(MO_PREFIX + nameOutput + ".valueschema",
+          null) != null)
+        valSchema = Schema.parse(job.getConfiguration().get(
+            MO_PREFIX + nameOutput + ".valueschema"));
+    setSchema(job, keySchema, valSchema);
     taskContext = createTaskAttemptContext(
       job.getConfiguration(), context.getTaskAttemptID());
     

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java?rev=1447419&r1=1447418&r2=1447419&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroMultipleOutputs.java Mon Feb 18 18:27:49 2013
@@ -127,7 +127,10 @@ public class TestAvroMultipleOutputs {
       record2.put("name1", new Utf8(line.toString()));
       record2.put("count1", new Integer(sum));
       mStats.datum(record2); 
+      amos.write(mStats, NullWritable.get(), STATS_SCHEMA_2, null, "testnewwrite2");
       amos.write("myavro1",mStats);
+      amos.write(mStats, NullWritable.get(), STATS_SCHEMA, null, "testnewwrite");
+      amos.write(mStats, NullWritable.get(), "testwritenonschema");
     }
    
     @Override
@@ -241,6 +244,52 @@ public class TestAvroMultipleOutputs {
     Assert.assertEquals(3, counts.get("apple").intValue());
     Assert.assertEquals(2, counts.get("banana").intValue());
     Assert.assertEquals(1, counts.get("carrot").intValue());
+  
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/testnewwrite-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+            new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+       counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+    }
+    reader.close();
+    
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+        
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/testnewwrite2-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA_2));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+     counts.put(((Utf8) record.get("name1")).toString(), (Integer) record.get("count1"));
+    }
+    reader.close();
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+    
+    outputFiles = fileSystem.globStatus(outputPath.suffix("/testwritenonschema-r-00000.avro"));
+    Assert.assertEquals(1, outputFiles.length);
+    reader = new DataFileReader<GenericData.Record>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new GenericDatumReader<GenericData.Record>(STATS_SCHEMA));
+    counts = new HashMap<String, Integer>();
+    for (GenericData.Record record : reader) {
+      counts.put(((Utf8) record.get("name")).toString(), (Integer) record.get("count"));
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+    
+    
   }
 
   @Test