You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by to...@apache.org on 2012/09/27 15:25:48 UTC

svn commit: r1391001 - in /avro/trunk: ./ lang/java/ lang/java/mapred/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/ lang/java/mapred/src/main/java/org/apache/avro/mapreduce/ lang/java/mapred/src/main/java/org/apache/hadoop/io/ lang/java/ma...

Author: tomwhite
Date: Thu Sep 27 13:25:47 2012
New Revision: 1391001

URL: http://svn.apache.org/viewvc?rev=1391001&view=rev
Log:
AVRO-1170. Java: Avro's new mapreduce APIs don't work with Hadoop 2.

Removed:
    avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/build.sh
    avro/trunk/lang/java/mapred/pom.xml
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
    avro/trunk/lang/java/pom.xml
    avro/trunk/lang/java/trevni/avro/pom.xml

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Sep 27 13:25:47 2012
@@ -14,6 +14,9 @@ Trunk (not yet released)
     AVRO-1171. Java: Don't call configure() twice on mappers & reducers.
     (Dave Beech via cutting)
 
+    AVRO-1170. Java: Avro's new mapreduce APIs don't work with Hadoop 2.
+    (tomwhite)
+
 Avro 1.7.2 (20 October 2012)
 
   NEW FEATURES

Modified: avro/trunk/build.sh
URL: http://svn.apache.org/viewvc/avro/trunk/build.sh?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/build.sh (original)
+++ avro/trunk/build.sh Thu Sep 27 13:25:47 2012
@@ -93,7 +93,8 @@ case "$target" in
 
 	# build lang-specific artifacts
         
-	(cd lang/java; mvn -P dist package -DskipTests -Davro.version=$VERSION javadoc:aggregate) 
+	(cd lang/java; mvn package -DskipTests -Dhadoop.version=2;
+	  mvn -P dist package -DskipTests -Davro.version=$VERSION javadoc:aggregate) 
         (cd lang/java/trevni/doc; mvn site)
         (mvn -N -P copy-artifacts antrun:run) 
 

Modified: avro/trunk/lang/java/mapred/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/pom.xml?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/pom.xml (original)
+++ avro/trunk/lang/java/mapred/pom.xml Thu Sep 27 13:25:47 2012
@@ -76,6 +76,25 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>main</id>
+            <goals><goal>jar</goal></goals>
+            <phase>package</phase>
+          </execution>
+          <execution>
+            <id>with-classifier</id>
+            <goals><goal>jar</goal></goals>
+            <phase>package</phase>
+            <configuration>
+              <classifier>${envClassifier}</classifier>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
@@ -85,14 +104,7 @@
       <artifactId>avro-ipc</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
-      <!-- hadoop's execution environment provides its own jars, usurping any others.
-        So we should not include it here -->
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
+    <dependency>     
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>
       <version>${easymock.version}</version>
@@ -115,6 +127,64 @@
       <version>${jackson.version}</version>
     </dependency>
   </dependencies>
+  
+  <profiles>
+     <profile>
+      <id>hadoop1</id>
+      <activation>
+        <property>
+          <name>!hadoop.version</name> <!-- if no hadoop.version is set -->
+        </property>
+      </activation>
+      <properties>
+        <envClassifier>hadoop1</envClassifier>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <version>${hadoop1.version}</version>
+          <!-- hadoop's execution environment provides its own jars, usurping any others.
+            So we should not include it here -->
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hadoop2</id>
+      <activation>
+        <property>
+          <name>hadoop.version</name>
+          <value>2</value>
+        </property>
+      </activation>
+      <properties>
+        <envClassifier>hadoop2</envClassifier>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+          <version>${hadoop2.version}</version>
+          <!-- hadoop's execution environment provides its own jars, usurping any others.
+            So we should not include it here -->
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-common</artifactId>
+        <version>${hadoop2.version}</version>
+          <scope>test</scope> <!-- for LocalJobRunner -->
+        </dependency>
+        <dependency>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
+          <version>${commons-httpclient.version}</version>
+          <scope>test</scope> <!-- for LocalJobRunner -->
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 
 </project>
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java Thu Sep 27 13:25:47 2012
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFileBase;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -69,7 +68,7 @@ import org.slf4j.LoggerFactory;
  *   <li>store the Avro key and value schemas in the SequenceFile <i>header</i>.</li>
  * </ul>
  */
-public class AvroSequenceFile extends SequenceFileBase {
+public class AvroSequenceFile {
   private static final Logger LOG = LoggerFactory.getLogger(AvroSequenceFile.class);
 
   /** The SequencFile.Metadata field for the Avro key writer schema. */
@@ -93,17 +92,13 @@ public class AvroSequenceFile extends Se
    * @throws IOException If the writer cannot be created.
    */
   public static SequenceFile.Writer createWriter(Writer.Options options) throws IOException {
-    switch (options.getCompressionType()) {
-    case NONE:
-      return new Writer(options);
-    case RECORD:
-      return new RecordCompressWriter(options);
-    case BLOCK:
-      return new BlockCompressWriter(options);
-    default:
-      throw new IllegalArgumentException(
-          "Invalid compression type: " + options.getCompressionType());
-    }
+    return SequenceFile.createWriter(
+        options.getFileSystem(), options.getConfigurationWithAvroSerialization(),
+        options.getOutputPath(), options.getKeyClass(), options.getValueClass(),
+        options.getBufferSizeBytes(), options.getReplicationFactor(),
+        options.getBlockSizeBytes(), 
+        options.getCompressionType(), options.getCompressionCodec(),
+        options.getProgressable(), options.getMetadataWithAvroSchemas());
   }
 
   /**
@@ -543,44 +538,6 @@ public class AvroSequenceFile extends Se
   }
 
   /**
-   * A Writer for Avro-enabled SequenceFiles using record-level compression.
-   */
-  public static class RecordCompressWriter extends RecordCompressWriterBase {
-    /**
-     * Creates a new <code>RecordCompressWriter</code> to a SequenceFile that supports Avro data.
-     *
-     * @param options The writer options.
-     * @throws IOException If the writer cannot be initialized.
-     */
-    public RecordCompressWriter(Writer.Options options) throws IOException {
-      super(options.getFileSystem(), options.getConfigurationWithAvroSerialization(),
-          options.getOutputPath(), options.getKeyClass(), options.getValueClass(),
-          options.getBufferSizeBytes(), options.getReplicationFactor(),
-          options.getBlockSizeBytes(), options.getCompressionCodec(),
-          options.getProgressable(), options.getMetadataWithAvroSchemas());
-    }
-  }
-
-  /**
-   * A Writer for Avro-enabled SequenceFiles using block-level compression.
-   */
-  public static class BlockCompressWriter extends BlockCompressWriterBase {
-    /**
-     * Creates a new <code>BlockCompressWriter</code> to a SequenceFile that supports Avro data.
-     *
-     * @param options The writer options.
-     * @throws IOException If the writer cannot be initialized.
-     */
-    public BlockCompressWriter(Writer.Options options) throws IOException {
-      super(options.getFileSystem(), options.getConfigurationWithAvroSerialization(),
-          options.getOutputPath(), options.getKeyClass(), options.getValueClass(),
-          options.getBufferSizeBytes(), options.getReplicationFactor(),
-          options.getBlockSizeBytes(), options.getCompressionCodec(),
-          options.getProgressable(), options.getMetadataWithAvroSchemas());
-    }
-  }
-
-  /**
    * A reader for SequenceFiles that may contain Avro data.
    */
   public static class Reader extends SequenceFile.Reader {

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java Thu Sep 27 13:25:47 2012
@@ -18,6 +18,7 @@
 package org.apache.avro.mapreduce;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.avro.Schema;
@@ -422,7 +424,7 @@ public class AvroMultipleOutputs{
   public void write(Object key, Object value, String baseOutputPath) 
       throws IOException, InterruptedException {
     checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = new TaskAttemptContext(
+    TaskAttemptContext taskContext = createTaskAttemptContext(
       context.getConfiguration(), context.getTaskAttemptID());
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
   }
@@ -495,7 +497,7 @@ public class AvroMultipleOutputs{
       else
         AvroJob.setOutputValueSchema(job,valSchema);
     }
-    taskContext = new TaskAttemptContext(
+    taskContext = createTaskAttemptContext(
       job.getConfiguration(), context.getTaskAttemptID());
     
     taskContexts.put(nameOutput, taskContext);
@@ -503,6 +505,34 @@ public class AvroMultipleOutputs{
     return taskContext;
   }
   
+  private TaskAttemptContext createTaskAttemptContext(Configuration conf, 
+      TaskAttemptID taskId) {
+    // Use reflection since the context types changed incompatibly between 1.0
+    // and 2.0.
+    try {
+      Class<?> c = getTaskAttemptContextClass();
+      Constructor<?> cons = c.getConstructor(Configuration.class,
+          TaskAttemptID.class);
+      return (TaskAttemptContext) cons.newInstance(conf, taskId);
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+  private Class<?> getTaskAttemptContextClass() {
+    try {
+      return Class.forName(
+          "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+    } catch (Exception e) {
+      try {
+        return Class.forName(
+            "org.apache.hadoop.mapreduce.TaskAttemptContext");
+      } catch (Exception ex) {
+        throw new IllegalStateException(ex);
+      }
+    }
+  }
+  
   /**
    * Closes all the opened outputs.
    * 

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Thu Sep 27 13:25:47 2012
@@ -81,7 +81,7 @@ public class TestAvroKeyOutputFormat {
     expect(context.getConfiguration())
         .andReturn(job.getConfiguration()).anyTimes();
     expect(context.getTaskAttemptID())
-        .andReturn(new TaskAttemptID("id", 1, true, 1, 1))
+        .andReturn(TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0"))
         .anyTimes();
 
     // Create a mock record writer.

Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Thu Sep 27 13:25:47 2012
@@ -37,7 +37,13 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
     <!-- version properties for dependencies -->
-    <hadoop.version>0.20.205.0</hadoop.version>
+    
+    <!--
+      To build the avro-mapred module against Hadoop 2 specify
+      -Dhadoop.version=2 or leave unspecified to build against Hadoop 1
+    -->
+    <hadoop1.version>0.20.205.0</hadoop1.version>
+    <hadoop2.version>2.0.1-alpha</hadoop2.version>
     <jackson.version>1.8.8</jackson.version>
     <jetty.version>6.1.26</jetty.version>
     <jetty-servlet-api.version>2.5-20081211</jetty-servlet-api.version>
@@ -55,6 +61,7 @@
     <commons-lang.version>2.6</commons-lang.version>
     <easymock.version>3.0</easymock.version>
     <hamcrest.version>1.1</hamcrest.version>
+    <commons-httpclient.version>3.1</commons-httpclient.version>
 
     <!-- version properties for plugins -->
     <checkstyle-plugin.version>2.8</checkstyle-plugin.version>
@@ -228,7 +235,7 @@
             <links>
               <link>http://jackson.codehaus.org/${jackson.version}/javadoc/</link>
               <link>http://java.sun.com/products/servlet/2.3/javadoc/</link>
-              <link>http://hadoop.apache.org/common/docs/r${hadoop.version}/api/</link>
+              <link>http://hadoop.apache.org/common/docs/r${hadoop1.version}/api/</link>
             </links>
           </configuration>
         </plugin>
@@ -345,7 +352,7 @@
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-core</artifactId>
-        <version>${hadoop.version}</version>
+        <version>${hadoop1.version}</version>
       </dependency>
       <dependency>
         <groupId>org.xerial.snappy</groupId>

Modified: avro/trunk/lang/java/trevni/avro/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/pom.xml?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/pom.xml (original)
+++ avro/trunk/lang/java/trevni/avro/pom.xml Thu Sep 27 13:25:47 2012
@@ -59,7 +59,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
-      <version>${hadoop.version}</version>
+      <version>${hadoop1.version}</version>
       <scope>compile</scope>
     </dependency>
   </dependencies>