You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by to...@apache.org on 2012/09/27 15:25:48 UTC
svn commit: r1391001 - in /avro/trunk: ./ lang/java/ lang/java/mapred/
lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/
lang/java/mapred/src/main/java/org/apache/avro/mapreduce/
lang/java/mapred/src/main/java/org/apache/hadoop/io/ lang/java/ma...
Author: tomwhite
Date: Thu Sep 27 13:25:47 2012
New Revision: 1391001
URL: http://svn.apache.org/viewvc?rev=1391001&view=rev
Log:
AVRO-1170. Java: Avro's new mapreduce APIs don't work with Hadoop 2.
Removed:
avro/trunk/lang/java/mapred/src/main/java/org/apache/hadoop/io/SequenceFileBase.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/build.sh
avro/trunk/lang/java/mapred/pom.xml
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
avro/trunk/lang/java/pom.xml
avro/trunk/lang/java/trevni/avro/pom.xml
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Sep 27 13:25:47 2012
@@ -14,6 +14,9 @@ Trunk (not yet released)
AVRO-1171. Java: Don't call configure() twice on mappers & reducers.
(Dave Beech via cutting)
+ AVRO-1170. Java: Avro's new mapreduce APIs don't work with Hadoop 2.
+ (tomwhite)
+
Avro 1.7.2 (20 October 2012)
NEW FEATURES
Modified: avro/trunk/build.sh
URL: http://svn.apache.org/viewvc/avro/trunk/build.sh?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/build.sh (original)
+++ avro/trunk/build.sh Thu Sep 27 13:25:47 2012
@@ -93,7 +93,8 @@ case "$target" in
# build lang-specific artifacts
- (cd lang/java; mvn -P dist package -DskipTests -Davro.version=$VERSION javadoc:aggregate)
+ (cd lang/java; mvn package -DskipTests -Dhadoop.version=2;
+ mvn -P dist package -DskipTests -Davro.version=$VERSION javadoc:aggregate)
(cd lang/java/trevni/doc; mvn site)
(mvn -N -P copy-artifacts antrun:run)
Modified: avro/trunk/lang/java/mapred/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/pom.xml?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/pom.xml (original)
+++ avro/trunk/lang/java/mapred/pom.xml Thu Sep 27 13:25:47 2012
@@ -76,6 +76,25 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>main</id>
+ <goals><goal>jar</goal></goals>
+ <phase>package</phase>
+ </execution>
+ <execution>
+ <id>with-classifier</id>
+ <goals><goal>jar</goal></goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>${envClassifier}</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -85,14 +104,7 @@
<artifactId>avro-ipc</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <!-- hadoop's execution environment provides its own jars, usurping any others.
- So we should not include it here -->
- <scope>provided</scope>
- </dependency>
- <dependency>
+ <dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>${easymock.version}</version>
@@ -115,6 +127,64 @@
<version>${jackson.version}</version>
</dependency>
</dependencies>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <activation>
+ <property>
+ <name>!hadoop.version</name> <!-- if no hadoop.version is set -->
+ </property>
+ </activation>
+ <properties>
+ <envClassifier>hadoop1</envClassifier>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop1.version}</version>
+ <!-- hadoop's execution environment provides its own jars, usurping any others.
+ So we should not include it here -->
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <activation>
+ <property>
+ <name>hadoop.version</name>
+ <value>2</value>
+ </property>
+ </activation>
+ <properties>
+ <envClassifier>hadoop2</envClassifier>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop2.version}</version>
+ <!-- hadoop's execution environment provides its own jars, usurping any others.
+ So we should not include it here -->
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop2.version}</version>
+ <scope>test</scope> <!-- for LocalJobRunner -->
+ </dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>${commons-httpclient.version}</version>
+ <scope>test</scope> <!-- for LocalJobRunner -->
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSequenceFile.java Thu Sep 27 13:25:47 2012
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFileBase;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -69,7 +68,7 @@ import org.slf4j.LoggerFactory;
* <li>store the Avro key and value schemas in the SequenceFile <i>header</i>.</li>
* </ul>
*/
-public class AvroSequenceFile extends SequenceFileBase {
+public class AvroSequenceFile {
private static final Logger LOG = LoggerFactory.getLogger(AvroSequenceFile.class);
/** The SequencFile.Metadata field for the Avro key writer schema. */
@@ -93,17 +92,13 @@ public class AvroSequenceFile extends Se
* @throws IOException If the writer cannot be created.
*/
public static SequenceFile.Writer createWriter(Writer.Options options) throws IOException {
- switch (options.getCompressionType()) {
- case NONE:
- return new Writer(options);
- case RECORD:
- return new RecordCompressWriter(options);
- case BLOCK:
- return new BlockCompressWriter(options);
- default:
- throw new IllegalArgumentException(
- "Invalid compression type: " + options.getCompressionType());
- }
+ return SequenceFile.createWriter(
+ options.getFileSystem(), options.getConfigurationWithAvroSerialization(),
+ options.getOutputPath(), options.getKeyClass(), options.getValueClass(),
+ options.getBufferSizeBytes(), options.getReplicationFactor(),
+ options.getBlockSizeBytes(),
+ options.getCompressionType(), options.getCompressionCodec(),
+ options.getProgressable(), options.getMetadataWithAvroSchemas());
}
/**
@@ -543,44 +538,6 @@ public class AvroSequenceFile extends Se
}
/**
- * A Writer for Avro-enabled SequenceFiles using record-level compression.
- */
- public static class RecordCompressWriter extends RecordCompressWriterBase {
- /**
- * Creates a new <code>RecordCompressWriter</code> to a SequenceFile that supports Avro data.
- *
- * @param options The writer options.
- * @throws IOException If the writer cannot be initialized.
- */
- public RecordCompressWriter(Writer.Options options) throws IOException {
- super(options.getFileSystem(), options.getConfigurationWithAvroSerialization(),
- options.getOutputPath(), options.getKeyClass(), options.getValueClass(),
- options.getBufferSizeBytes(), options.getReplicationFactor(),
- options.getBlockSizeBytes(), options.getCompressionCodec(),
- options.getProgressable(), options.getMetadataWithAvroSchemas());
- }
- }
-
- /**
- * A Writer for Avro-enabled SequenceFiles using block-level compression.
- */
- public static class BlockCompressWriter extends BlockCompressWriterBase {
- /**
- * Creates a new <code>BlockCompressWriter</code> to a SequenceFile that supports Avro data.
- *
- * @param options The writer options.
- * @throws IOException If the writer cannot be initialized.
- */
- public BlockCompressWriter(Writer.Options options) throws IOException {
- super(options.getFileSystem(), options.getConfigurationWithAvroSerialization(),
- options.getOutputPath(), options.getKeyClass(), options.getValueClass(),
- options.getBufferSizeBytes(), options.getReplicationFactor(),
- options.getBlockSizeBytes(), options.getCompressionCodec(),
- options.getProgressable(), options.getMetadataWithAvroSchemas());
- }
- }
-
- /**
* A reader for SequenceFiles that may contain Avro data.
*/
public static class Reader extends SequenceFile.Reader {
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroMultipleOutputs.java Thu Sep 27 13:25:47 2012
@@ -18,6 +18,7 @@
package org.apache.avro.mapreduce;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.Schema;
@@ -422,7 +424,7 @@ public class AvroMultipleOutputs{
public void write(Object key, Object value, String baseOutputPath)
throws IOException, InterruptedException {
checkBaseOutputPath(baseOutputPath);
- TaskAttemptContext taskContext = new TaskAttemptContext(
+ TaskAttemptContext taskContext = createTaskAttemptContext(
context.getConfiguration(), context.getTaskAttemptID());
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
@@ -495,7 +497,7 @@ public class AvroMultipleOutputs{
else
AvroJob.setOutputValueSchema(job,valSchema);
}
- taskContext = new TaskAttemptContext(
+ taskContext = createTaskAttemptContext(
job.getConfiguration(), context.getTaskAttemptID());
taskContexts.put(nameOutput, taskContext);
@@ -503,6 +505,34 @@ public class AvroMultipleOutputs{
return taskContext;
}
+ private TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
+ // Use reflection since the context types changed incompatibly between 1.0
+ // and 2.0.
+ try {
+ Class<?> c = getTaskAttemptContextClass();
+ Constructor<?> cons = c.getConstructor(Configuration.class,
+ TaskAttemptID.class);
+ return (TaskAttemptContext) cons.newInstance(conf, taskId);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private Class<?> getTaskAttemptContextClass() {
+ try {
+ return Class.forName(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+ } catch (Exception e) {
+ try {
+ return Class.forName(
+ "org.apache.hadoop.mapreduce.TaskAttemptContext");
+ } catch (Exception ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+ }
+
/**
* Closes all the opened outputs.
*
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Thu Sep 27 13:25:47 2012
@@ -81,7 +81,7 @@ public class TestAvroKeyOutputFormat {
expect(context.getConfiguration())
.andReturn(job.getConfiguration()).anyTimes();
expect(context.getTaskAttemptID())
- .andReturn(new TaskAttemptID("id", 1, true, 1, 1))
+ .andReturn(TaskAttemptID.forName("attempt_200707121733_0001_m_000000_0"))
.anyTimes();
// Create a mock record writer.
Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Thu Sep 27 13:25:47 2012
@@ -37,7 +37,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- version properties for dependencies -->
- <hadoop.version>0.20.205.0</hadoop.version>
+
+ <!--
+ To build the avro-mapred module against Hadoop 2 specify
+ -Dhadoop.version=2 or leave unspecified to build against Hadoop 1
+ -->
+ <hadoop1.version>0.20.205.0</hadoop1.version>
+ <hadoop2.version>2.0.1-alpha</hadoop2.version>
<jackson.version>1.8.8</jackson.version>
<jetty.version>6.1.26</jetty.version>
<jetty-servlet-api.version>2.5-20081211</jetty-servlet-api.version>
@@ -55,6 +61,7 @@
<commons-lang.version>2.6</commons-lang.version>
<easymock.version>3.0</easymock.version>
<hamcrest.version>1.1</hamcrest.version>
+ <commons-httpclient.version>3.1</commons-httpclient.version>
<!-- version properties for plugins -->
<checkstyle-plugin.version>2.8</checkstyle-plugin.version>
@@ -228,7 +235,7 @@
<links>
<link>http://jackson.codehaus.org/${jackson.version}/javadoc/</link>
<link>http://java.sun.com/products/servlet/2.3/javadoc/</link>
- <link>http://hadoop.apache.org/common/docs/r${hadoop.version}/api/</link>
+ <link>http://hadoop.apache.org/common/docs/r${hadoop1.version}/api/</link>
</links>
</configuration>
</plugin>
@@ -345,7 +352,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>${hadoop.version}</version>
+ <version>${hadoop1.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
Modified: avro/trunk/lang/java/trevni/avro/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/trevni/avro/pom.xml?rev=1391001&r1=1391000&r2=1391001&view=diff
==============================================================================
--- avro/trunk/lang/java/trevni/avro/pom.xml (original)
+++ avro/trunk/lang/java/trevni/avro/pom.xml Thu Sep 27 13:25:47 2012
@@ -59,7 +59,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>${hadoop.version}</version>
+ <version>${hadoop1.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>