You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/06/15 20:21:19 UTC
svn commit: r954998 [1/2] - in /avro/trunk: ./ lang/java/
lang/java/src/java/org/apache/avro/file/
lang/java/src/java/org/apache/avro/generic/
lang/java/src/java/org/apache/avro/io/
lang/java/src/java/org/apache/avro/ipc/ lang/java/src/java/org/apache/...
Author: cutting
Date: Tue Jun 15 18:21:18 2010
New Revision: 954998
URL: http://svn.apache.org/viewvc?rev=954998&view=rev
Log:
AVRO-512. Java: Define and implement MapReduce connector protocols.
Added:
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
avro/trunk/share/schemas/org/apache/avro/mapred/
avro/trunk/share/schemas/org/apache/avro/mapred/tether/
avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr
avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/build.xml
avro/trunk/lang/java/ivy.xml
avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java
avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java
avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java
avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java
avro/trunk/lang/java/src/test/bin/test_tools.sh
avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java
avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jun 15 18:21:18 2010
@@ -10,6 +10,9 @@ Avro 1.4.0 (unreleased)
AVRO-285: Specify one-way messages and implement in Java. (cutting)
+ AVRO-512. Java: Define and implement MapReduce connector
+ protocols. (cutting)
+
IMPROVEMENTS
AVRO-501. missing function in C api to access array elements after
Modified: avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/build.xml?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/build.xml (original)
+++ avro/trunk/lang/java/build.xml Tue Jun 15 18:21:18 2010
@@ -151,7 +151,8 @@
<target name="compile" depends="javacc,ivy-retrieve">
<java-compiler
- excludes="**/ipc/** **/*Requestor.java **/*Responder.java **/tool/**">
+ excludes="**/ipc/** **/*Requestor.java **/*Responder.java
+ **/tool/** **/mapred/**">
<src path="${build.dir}/src"/>
<src path="${java.src.dir}"/>
</java-compiler>
@@ -415,6 +416,24 @@
<test-runner files.location="${test.java.classes}" tests.pattern="**/TestProtocolSpecific$InteropTest.class" />
</target>
+ <target name="tether-wordcount-jar" depends="compile-test-java"
+ description="Build tether wordcount jar file">
+ <jar jarfile="${test.java.build.dir}/wordcount.jar">
+ <manifest>
+ <attribute name="Main-Class"
+ value="org.apache.avro.mapred.tether.WordCountTask"/>
+ </manifest>
+ <fileset dir="${build.classes}" />
+ <zipgroupfileset dir="${ivy.lib}" includes="jackson*.jar"/>
+ <zipgroupfileset dir="${ivy.test.lib}" includes="slf4j*.jar"/>
+ <fileset dir="${test.java.classes}"
+ includes="org/apache/avro/mapred/tether/*"/>
+ <fileset dir="${test.java.generated.classes}"
+ includes="org/apache/avro/mapred/*"/>
+ </jar>
+ <chmod file="${test.java.build.dir}/wordcount.jar" perm="ugo+x"/>
+ </target>
+
<target name="tools" depends="compile,ivy-retrieve-tools"
description="Build standalone tools jar file">
<jar jarfile="${build.dir}/avro-tools-${version}.jar">
@@ -430,13 +449,14 @@
<chmod file="${build.dir}/avro-tools-${version}.jar" perm="ugo+x"/>
</target>
- <target name="test-tools" depends="tools,compile-test-java"
+ <target name="test-tools" depends="tools,unit-test-java,tether-wordcount-jar"
description="Tests tools">
<exec executable="${basedir}/src/test/bin/test_tools.sh"
failonerror="true">
<env key="TOOLS" value="${build.dir}/avro-tools-${version}.jar"/>
<env key="TMPDIR" value="${test.java.build.dir}/tools"/>
<env key="JAVA_HOME" value="${java.home}"/>
+ <env key="HADOOP_LOG_DIR" value="${build.dir}/logs"/>
</exec>
</target>
Modified: avro/trunk/lang/java/ivy.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ivy.xml?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/ivy.xml (original)
+++ avro/trunk/lang/java/ivy.xml Tue Jun 15 18:21:18 2010
@@ -18,10 +18,10 @@
<ivy-module version="2.0"
xmlns:e="http://ant.apache.org/ivy/extra">
- <info organisation="org.apache.hadoop"
+ <info organisation="org.apache.avro"
module="${name}" revision="${version}">
<license name="Apache 2.0"/>
- <ivyauthor name="Apache Hadoop" url="http://hadoop.apache.org"/>
+ <ivyauthor name="Apache Avro" url="http://avro.apache.org"/>
<description>Avro</description>
</info>
@@ -62,9 +62,9 @@
<dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"
conf="build->default;test->default;tools->default"/>
<dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"
- conf="build->default" transitive="false"/>
+ conf="build->default;tools->default" transitive="false"/>
<dependency org="commons-httpclient" name="commons-httpclient" rev="3.0.1"
- conf="test->default"/>
+ conf="test->default;tools->default"/>
</dependencies>
</ivy-module>
Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Tue Jun 15 18:21:18 2010
@@ -54,6 +54,8 @@ public class DataFileStream<D> implement
Map<String,byte[]> meta = new HashMap<String,byte[]>();
+ ByteBuffer blockBuffer;
+ long blockCount; // # entries in block
long blockRemaining; // # entries remaining in block
byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
@@ -159,7 +161,7 @@ public class DataFileStream<D> implement
}
if (hasNextBlock()) {
block = nextBlock(block);
- ByteBuffer blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
+ blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
blockBuffer = codec.decompress(blockBuffer);
datumIn = DecoderFactory.defaultFactory().createBinaryDecoder(
blockBuffer.array(), blockBuffer.arrayOffset() +
@@ -199,6 +201,20 @@ public class DataFileStream<D> implement
return result;
}
+ /** Expert: Return the next block in the file, as binary-encoded data. */
+ public ByteBuffer nextBlock() throws IOException {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ if (blockRemaining != blockCount)
+ throw new IllegalStateException("Not at block start.");
+ blockRemaining = 0;
+ datumIn = null;
+ return blockBuffer;
+ }
+
+ /** Expert: Return the count of items in the current block. */
+ public long getBlockCount() { return blockCount; }
+
protected void blockFinished() throws IOException {
// nothing for the stream impl
}
@@ -214,6 +230,7 @@ public class DataFileStream<D> implement
throw new IOException("Block size invalid or too large for this " +
"implementation: " + blockSize);
}
+ blockCount = blockRemaining;
availableBlock = true;
return true;
} catch (EOFException eof) {
Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Tue Jun 15 18:21:18 2010
@@ -248,6 +248,18 @@ public class DataFileWriter<D> implement
writeBlock();
}
+ /** Expert: Append a pre-encoded datum to the file. No validation is
+ * performed to check that the encoding conforms to the file's schema.
+ * Appending non-conforming data may result in an unreadable file. */
+ public void appendEncoded(ByteBuffer datum) throws IOException {
+ assertOpen();
+ int start = datum.position();
+ buffer.write(datum.array(), start, datum.limit()-start);
+ blockCount++;
+ if (buffer.size() >= syncInterval)
+ writeBlock();
+ }
+
/**
* Appends data from another file. otherFile must have the same schema.
* Data blocks will be copied without de-serializing data. If the codecs
Modified: avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java Tue Jun 15 18:21:18 2010
@@ -441,6 +441,8 @@ public class GenericData {
return hashCode;
case UNION:
return hashCode(o, s.getTypes().get(resolveUnion(s, o)));
+ case ENUM:
+ return s.getEnumOrdinal(o.toString());
case NULL:
return 0;
default:
Modified: avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java Tue Jun 15 18:21:18 2010
@@ -184,4 +184,111 @@ public class BinaryData {
return l1 - l2;
}
+ private static class HashData {
+ private final BufferAccessor bytes;
+ private final BinaryDecoder decoder;
+ public HashData() {
+ this.decoder = new BinaryDecoder(new byte[0], 0, 0);
+ this.bytes = decoder.getBufferAccessor();
+ }
+ public void set(byte[] bytes, int start, int len) {
+ this.decoder.init(bytes, start, len);
+ }
+ }
+
+ private static final ThreadLocal<HashData> HASH_DATA
+ = new ThreadLocal<HashData>() {
+ @Override protected HashData initialValue() { return new HashData(); }
+ };
+
+ /** Hash binary encoded data. Consistent with {@link
+ * org.apache.avro.generic.GenericData#hashCode(Object, Schema)}.*/
+ public static int hashCode(byte[] bytes, int start, int length,
+ Schema schema) {
+ HashData data = HASH_DATA.get();
+ data.set(bytes, start, length);
+ try {
+ return hashCode(data, schema);
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ private static int hashCode(HashData data, Schema schema)
+ throws IOException {
+ Decoder decoder = data.decoder;
+ switch (schema.getType()) {
+ case RECORD: {
+ int hashCode = 1;
+ for (Field field : schema.getFields()) {
+ if (field.order() == Field.Order.IGNORE) {
+ GenericDatumReader.skip(field.schema(), decoder);
+ continue;
+ }
+ hashCode = hashCode*31 + hashCode(data, field.schema());
+ }
+ return hashCode;
+ }
+ case ENUM: case INT:
+ return decoder.readInt();
+ case FLOAT:
+ return Float.floatToIntBits(decoder.readFloat());
+ case LONG: {
+ long l = decoder.readLong();
+ return (int)(l^(l>>>32));
+ }
+ case DOUBLE: {
+ long l = Double.doubleToLongBits(decoder.readDouble());
+ return (int)(l^(l>>>32));
+ }
+ case ARRAY: {
+ Schema elementType = schema.getElementType();
+ int hashCode = 1;
+ for (long l = decoder.readArrayStart(); l != 0; l = decoder.arrayNext())
+ for (long i = 0; i < l; i++)
+ hashCode = hashCode*31 + hashCode(data, elementType);
+ return hashCode;
+ }
+ case MAP:
+ throw new AvroRuntimeException("Can't hashCode maps!");
+ case UNION:
+ return hashCode(data, schema.getTypes().get(decoder.readInt()));
+ case FIXED:
+ return hashBytes(1, data, schema.getFixedSize(), false);
+ case STRING:
+ return hashBytes(0, data, decoder.readInt(), false);
+ case BYTES:
+ return hashBytes(1, data, decoder.readInt(), true);
+ case BOOLEAN:
+ return decoder.readBoolean() ? 1231 : 1237;
+ case NULL:
+ return 0;
+ default:
+ throw new AvroRuntimeException("Unexpected schema to hashCode!");
+ }
+ }
+
+ private static int hashBytes(int init, HashData data, int len, boolean rev)
+ throws IOException {
+ int hashCode = init;
+ byte[] bytes = data.bytes.getBuf();
+ int start = data.bytes.getPos();
+ int end = start+len;
+ if (rev)
+ for (int i = end-1; i >= start; i--)
+ hashCode = hashCode*31 + bytes[i];
+ else
+ for (int i = start; i < end; i++)
+ hashCode = hashCode*31 + bytes[i];
+ data.decoder.skipFixed(len);
+ return hashCode;
+ }
+
+ /** Skip a binary-encoded long, returning the position after it. */
+ public static int skipLong(byte[] bytes, int start) {
+ int i = start;
+ for (int b = bytes[i++]; ((b & 0x80) != 0); b = bytes[i++]) {}
+ return i;
+ }
+
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java Tue Jun 15 18:21:18 2010
@@ -76,16 +76,14 @@ public class ByteBufferOutputStream exte
buffer.put(b, off, len);
}
- /** Add a buffer to the output without copying, if possible.
- * Sets buffer's position to its limit.
- */
+ /** Add a buffer to the output without copying, if possible. */
public void writeBuffer(ByteBuffer buffer) throws IOException {
if (buffer.remaining() < BUFFER_SIZE) {
write(buffer.array(), buffer.position(), buffer.remaining());
- } else {
- buffers.add(buffer); // append w/o copying
+ } else { // append w/o copying bytes
+ ByteBuffer dup = buffer.duplicate();
+ dup.position(buffer.limit()); // ready for flip
+ buffers.add(dup);
}
- buffer.position(buffer.limit()); // mark data as consumed
}
}
-
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Tue Jun 15 18:21:18 2010
@@ -21,6 +21,7 @@ package org.apache.avro.mapred;
import java.util.Collection;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.avro.Schema;
@@ -32,13 +33,15 @@ public class AvroJob {
static final String API_SPECIFIC = "specific";
static final String INPUT_API = "avro.input.api";
- static final String INPUT_SCHEMA = "avro.input.schema";
-
- static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+ static final String OUTPUT_API = "avro.output.api";
static final String MAP_OUTPUT_API = "avro.map.output.api";
- static final String OUTPUT_SCHEMA = "avro.output.schema";
- static final String OUTPUT_API = "avro.output.api";
+ /** The configuration key for a job's input schema. */
+ public static final String INPUT_SCHEMA = "avro.input.schema";
+ /** The configuration key for a job's intermediate schema. */
+ public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+ /** The configuration key for a job's output schema. */
+ public static final String OUTPUT_SCHEMA = "avro.output.schema";
/** Configure a job's map input to use Avro's generic API. */
public static void setInputGeneric(JobConf job, Schema s) {
@@ -59,32 +62,53 @@ public class AvroJob {
/** Configure a job's map output key schema using Avro's generic API. */
public static void setMapOutputGeneric(JobConf job, Schema s) {
- job.set(MAP_OUTPUT_SCHEMA, s.toString());
job.set(MAP_OUTPUT_API, API_GENERIC);
+ setMapOutputSchema(job, s);
configureAvroOutput(job);
}
/** Configure a job's map output key schema using Avro's specific API. */
public static void setMapOutputSpecific(JobConf job, Schema s) {
- job.set(MAP_OUTPUT_SCHEMA, s.toString());
job.set(MAP_OUTPUT_API, API_SPECIFIC);
+ setMapOutputSchema(job, s);
configureAvroOutput(job);
}
/** Configure a job's output key schema using Avro's generic API. */
public static void setOutputGeneric(JobConf job, Schema s) {
- job.set(OUTPUT_SCHEMA, s.toString());
job.set(OUTPUT_API, API_GENERIC);
+ setOutputSchema(job, s);
configureAvroOutput(job);
}
/** Configure a job's output key schema using Avro's specific API. */
public static void setOutputSpecific(JobConf job, Schema s) {
- job.set(OUTPUT_SCHEMA, s.toString());
job.set(OUTPUT_API, API_SPECIFIC);
+ setOutputSchema(job, s);
configureAvroOutput(job);
}
+ /** Set a job's map output key schema. */
+ public static void setMapOutputSchema(JobConf job, Schema s) {
+ job.set(MAP_OUTPUT_SCHEMA, s.toString());
+ }
+
+ /** Return a job's map output key schema. */
+ public static Schema getMapOutputSchema(Configuration job) {
+ return Schema.parse(job.get(AvroJob.MAP_OUTPUT_SCHEMA,
+ job.get(AvroJob.OUTPUT_SCHEMA)));
+ }
+
+ /** Set a job's output key schema. */
+ public static void setOutputSchema(JobConf job, Schema s) {
+ job.set(OUTPUT_SCHEMA, s.toString());
+ }
+
+ /** Return a job's output key schema. */
+ public static Schema getOutputSchema(Configuration job) {
+ return Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+ }
+
private static void configureAvroOutput(JobConf job) {
job.setOutputKeyClass(AvroWrapper.class);
job.setOutputKeyComparatorClass(AvroKeyComparator.class);
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java Tue Jun 15 18:21:18 2010
@@ -38,8 +38,7 @@ public class AvroKeyComparator<T>
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null) {
- schema = Schema.parse(conf.get(AvroJob.MAP_OUTPUT_SCHEMA,
- conf.get(AvroJob.OUTPUT_SCHEMA)));
+ schema = AvroJob.getMapOutputSchema(conf);
String api = getConf().get(AvroJob.MAP_OUTPUT_API,
getConf().get(AvroJob.OUTPUT_API));
model = AvroJob.API_SPECIFIC.equals(api)
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java Tue Jun 15 18:21:18 2010
@@ -52,10 +52,7 @@ public class AvroKeySerialization<T> ext
// We need not rely on mapred.task.is.map here to determine whether map
// output or final output is desired, since the mapreduce framework never
// creates a deserializer for final output, only for map output.
- String json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA,
- getConf().get(AvroJob.OUTPUT_SCHEMA));
- Schema schema = Schema.parse(json);
-
+ Schema schema = AvroJob.getMapOutputSchema(getConf());
String api = getConf().get(AvroJob.MAP_OUTPUT_API,
getConf().get(AvroJob.OUTPUT_API));
DatumReader<T> reader = AvroJob.API_SPECIFIC.equals(api)
@@ -105,14 +102,13 @@ public class AvroKeySerialization<T> ext
// or final output is needed.
boolean isMap = getConf().getBoolean("mapred.task.is.map", false);
- String json = getConf().get(AvroJob.OUTPUT_SCHEMA);
- if (isMap)
- json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA, json);
- Schema schema = Schema.parse(json);
+ Schema schema = isMap
+ ? AvroJob.getMapOutputSchema(getConf())
+ : AvroJob.getOutputSchema(getConf());
String api = getConf().get(AvroJob.OUTPUT_API);
if (isMap)
- api = getConf().get(AvroJob.MAP_OUTPUT_API, json);
+ api = getConf().get(AvroJob.MAP_OUTPUT_API, api);
DatumWriter<T> writer = AvroJob.API_SPECIFIC.equals(api)
? new SpecificDatumWriter<T>(schema)
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java Tue Jun 15 18:21:18 2010
@@ -40,10 +40,14 @@ import org.apache.avro.file.CodecFactory
public class AvroOutputFormat <T>
extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
- final static String EXT = ".avro";
+ /** The file name extension for avro data files. */
+ public final static String EXT = ".avro";
- private static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
- private static final int DEFAULT_DEFLATE_LEVEL = 1;
+ /** The configuration key for Avro deflate level. */
+ public static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
+
+ /** The default deflate level. */
+ public static final int DEFAULT_DEFLATE_LEVEL = 1;
/** Enable output compression using the deflate codec and specify its level.*/
public static void setDeflateLevel(JobConf job, int level) {
@@ -56,7 +60,7 @@ public class AvroOutputFormat <T>
String name, Progressable prog)
throws IOException {
- Schema schema = Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+ Schema schema = AvroJob.getOutputSchema(job);
DatumWriter<T> datumWriter =
AvroJob.API_SPECIFIC.equals(job.get(AvroJob.OUTPUT_API))
Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html Tue Jun 15 18:21:18 2010
@@ -18,8 +18,8 @@
-->
<body>
-Tools to permit using Avro data
-with <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs.
+Run <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs over
+Avro data, with map and reduce functions written in Java.
<p>Avro data files do not contain key/value pairs as expected by
Hadoop's MapReduce API, but rather just a sequence of values. Thus
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+/** A wrapper for a ByteBuffer containing binary-encoded data. */
+class TetherData {
+ private int count = 1; // only used for task input
+ private ByteBuffer buffer;
+
+ public TetherData() {}
+ public TetherData(ByteBuffer buffer) { this.buffer = buffer; }
+
+ /** Return the count of records in the buffer. Used for task input only.*/
+ public int count() { return count; }
+
+ /** Set the count of records in the buffer. Used for task input only. */
+ public void count(int count) { this.count = count; }
+
+ /** Return the buffer. */
+ public ByteBuffer buffer() { return buffer; }
+
+ /** Set the buffer. */
+ public void buffer(ByteBuffer buffer) { this.buffer = buffer; }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.avro.mapred.AvroOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for tethered Avro input. */
+class TetherInputFormat
+ extends FileInputFormat<TetherData, NullWritable> {
+
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(job))
+ if (file.getPath().getName().endsWith(AvroOutputFormat.EXT))
+ result.add(file);
+ return result.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public RecordReader<TetherData, NullWritable>
+ getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ reporter.setStatus(split.toString());
+ return new TetherRecordReader(job, (FileSplit)split);
+ }
+
+}
+
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.File;
+import java.util.List;
+import java.util.Collection;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+import org.apache.avro.Schema;
+import org.apache.avro.tool.Tool;
+import org.apache.avro.mapred.AvroJob;
+
+/** Constructs and submits tether jobs. This may either be used as a
+ * commandline-based or API-based method to launch tether jobs. */
+public class TetherJob extends Configured implements Tool {
+
+ /** Get the URI of the application's executable. */
+ public static URI getExecutable(JobConf job) {
+ try {
+ return new URI(job.get("avro.tether.executable"));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Set the URI for the application's executable. Normally this in HDFS. */
+ public static void setExecutable(JobConf job, URI executable) {
+ job.set("avro.tether.executable", executable.toString());
+ }
+
+ /** Submit a job to the map/reduce cluster. All of the necessary
+ * modifications to the job to run under tether are made to the
+ * configuration.
+ */
+ public static RunningJob runJob(JobConf job) throws IOException {
+ setupTetherJob(job);
+ return JobClient.runJob(job);
+ }
+
+ /** Submit a job to the Map-Reduce framework. */
+ public static RunningJob submitJob(JobConf conf) throws IOException {
+ setupTetherJob(conf);
+ return new JobClient(conf).submitJob(conf);
+ }
+
+ private static void setupTetherJob(JobConf job) throws IOException {
+ job.setMapRunnerClass(TetherMapRunner.class);
+ job.setPartitionerClass(TetherPartitioner.class);
+ job.setReducerClass(TetherReducer.class);
+
+ job.setInputFormat(TetherInputFormat.class);
+ job.setOutputFormat(TetherOutputFormat.class);
+
+ job.setOutputKeyClass(TetherData.class);
+ job.setOutputKeyComparatorClass(TetherKeyComparator.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ // add TetherKeySerialization to io.serializations
+ Collection<String> serializations =
+ job.getStringCollection("io.serializations");
+ if (!serializations.contains(TetherKeySerialization.class.getName())) {
+ serializations.add(TetherKeySerialization.class.getName());
+ job.setStrings("io.serializations",
+ serializations.toArray(new String[0]));
+ }
+
+ DistributedCache.addCacheFile(getExecutable(job), job);
+ }
+
+ // Tool methods
+
+ @Override
+ public String getName() { return "tether"; }
+
+ @Override
+ public String getShortDescription() {return "Run a tethered mapreduce job.";}
+
+ @Override
+ public int run(InputStream ins, PrintStream outs, PrintStream err,
+ List<String> args) throws Exception {
+
+ OptionParser p = new OptionParser();
+ OptionSpec<URI> exec =
+ p.accepts("program", "executable program, usually in HDFS")
+ .withRequiredArg().ofType(URI.class);
+ OptionSpec<String> in = p.accepts("in", "comma-separated input paths")
+ .withRequiredArg().ofType(String.class);
+ OptionSpec<Path> out = p.accepts("out", "output directory")
+ .withRequiredArg().ofType(Path.class);
+ OptionSpec<File> outSchema = p.accepts("outschema", "output schema file")
+ .withRequiredArg().ofType(File.class);
+ OptionSpec<File> mapOutSchema =
+ p.accepts("outschemamap", "map output schema file, if different")
+ .withOptionalArg().ofType(File.class);
+ OptionSpec<Integer> reduces = p.accepts("reduces", "number of reduces")
+ .withOptionalArg().ofType(Integer.class);
+
+ JobConf job = new JobConf();
+
+ try {
+ OptionSet opts = p.parse(args.toArray(new String[0]));
+ FileInputFormat.addInputPaths(job, in.value(opts));
+ FileOutputFormat.setOutputPath(job, out.value(opts));
+ TetherJob.setExecutable(job, exec.value(opts));
+ AvroJob.setOutputSchema(job, Schema.parse(outSchema.value(opts)));
+ if (opts.hasArgument(mapOutSchema))
+ AvroJob.setMapOutputSchema(job, Schema.parse(mapOutSchema.value(opts)));
+ if (opts.hasArgument(reduces))
+ job.setNumReduceTasks(reduces.value(opts));
+ } catch (Exception e) {
+ p.printHelpOn(err);
+ return -1;
+ }
+
+ runJob(job);
+ return 0;
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+
+/** The {@link RawComparator} used by jobs configured with {@link TetherJob}. */
+class TetherKeyComparator
+ extends Configured implements RawComparator<TetherData> {
+
+ private Schema schema;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null)
+ schema = AvroJob.getMapOutputSchema(conf);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int diff = BinaryData.compare(b1, BinaryData.skipLong(b1, s1),
+ b2, BinaryData.skipLong(b2, s2),
+ schema);
+ return diff == 0 ? -1 : diff;
+ }
+
+ @Override
+ public int compare(TetherData x, TetherData y) {
+ ByteBuffer b1 = x.buffer(), b2 = y.buffer();
+ int diff = BinaryData.compare(b1.array(), b1.position(),
+ b2.array(), b2.position(),
+ schema);
+ return diff == 0 ? -1 : diff;
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+/** A {@link Serialization} for {@link TetherData}. */
+class TetherKeySerialization
+ extends Configured implements Serialization<TetherData> {
+
+ public boolean accept(Class<?> c) {
+ return TetherData.class.isAssignableFrom(c);
+ }
+
+ public Deserializer<TetherData> getDeserializer(Class<TetherData> c) {
+ return new TetherDataDeserializer();
+ }
+
+ private static final DecoderFactory FACTORY = new DecoderFactory();
+ static { FACTORY.configureDirectDecoder(true); }
+
+ private class TetherDataDeserializer implements Deserializer<TetherData> {
+ private BinaryDecoder decoder;
+
+ public void open(InputStream in) {
+ this.decoder = FACTORY.createBinaryDecoder(in, decoder);
+ }
+
+ public TetherData deserialize(TetherData datum) throws IOException {
+ if (datum == null) datum = new TetherData();
+ datum.buffer(decoder.readBytes(datum.buffer()));
+ return datum;
+ }
+
+ public void close() throws IOException {
+ decoder.inputStream().close();
+ }
+ }
+
+ public Serializer<TetherData> getSerializer(Class<TetherData> c) {
+ return new TetherDataSerializer();
+ }
+
+ private class TetherDataSerializer implements Serializer<TetherData> {
+
+ private OutputStream out;
+ private BinaryEncoder encoder;
+
+ public void open(OutputStream out) {
+ this.out = out;
+ this.encoder = new BinaryEncoder(out);
+ }
+
+ public void serialize(TetherData datum) throws IOException {
+ encoder.writeBytes(datum.buffer());
+ }
+
+ public void close() throws IOException {
+ out.close();
+ }
+
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapRunner;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.mapred.AvroJob;
+
+class TetherMapRunner
+ extends MapRunner<TetherData, NullWritable, TetherData, NullWritable> {
+
+ static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+
+ private JobConf job;
+ private TetheredProcess process;
+
+ public void configure(JobConf job) {
+ this.job = job;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run(RecordReader<TetherData, NullWritable> recordReader,
+ OutputCollector<TetherData, NullWritable> collector,
+ Reporter reporter) throws IOException {
+ try {
+ // start tethered process
+ process = new TetheredProcess(job, collector, reporter);
+
+ // configure it
+ process.inputClient.configure
+ (TaskType.MAP,
+ new Utf8(job.get(AvroJob.INPUT_SCHEMA)),
+ new Utf8(AvroJob.getMapOutputSchema(job).toString()));
+
+ process.inputClient.partitions(job.getNumReduceTasks());
+
+ // run map
+ Counter inputRecordCounter =
+ reporter.getCounter("org.apache.hadoop.mapred.Task$Counter",
+ "MAP_INPUT_RECORDS");
+ TetherData data = new TetherData();
+ while (recordReader.next(data, NullWritable.get())) {
+ process.inputClient.input(data.buffer(), data.count());
+ inputRecordCounter.increment(data.count()-1);
+ if (process.outputService.isFinished())
+ break;
+ }
+ process.inputClient.complete();
+
+ // wait for completion
+ if (process.outputService.waitForFinish())
+ throw new IOException("Task failed: "+process.outputService.error());
+
+ } catch (Throwable t) { // send abort
+ LOG.warn("Task failed", t);
+ process.inputClient.abort();
+ throw new IOException("Task failed: "+t, t);
+
+ } finally { // clean up
+ if (process != null)
+ process.close();
+ }
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+class TetherOutputFormat
+ extends FileOutputFormat<TetherData, NullWritable> {
+
+ /** Enable output compression using the deflate codec and specify its level.*/
+ public static void setDeflateLevel(JobConf job, int level) {
+ FileOutputFormat.setCompressOutput(job, true);
+ job.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, level);
+ }
+
+ @SuppressWarnings("unchecked")
+ public RecordWriter<TetherData, NullWritable>
+ getRecordWriter(FileSystem ignore, JobConf job,
+ String name, Progressable prog)
+ throws IOException {
+
+ Schema schema = AvroJob.getOutputSchema(job);
+
+ final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter());
+
+ if (FileOutputFormat.getCompressOutput(job)) {
+ int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY,
+ AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+ writer.setCodec(CodecFactory.deflateCodec(level));
+ }
+
+ Path path =
+ FileOutputFormat.getTaskOutputPath(job, name+AvroOutputFormat.EXT);
+ writer.create(schema, path.getFileSystem(job).create(path));
+
+ return new RecordWriter<TetherData, NullWritable>() {
+ public void write(TetherData datum, NullWritable ignore)
+ throws IOException {
+ writer.appendEncoded(datum.buffer());
+ }
+ public void close(Reporter reporter) throws IOException {
+ writer.close();
+ }
+ };
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.util.Utf8;
+
+class TetherOutputService implements OutputProtocol {
+ private Reporter reporter;
+ private OutputCollector<TetherData, NullWritable> collector;
+ private int inputPort;
+ private boolean complete;
+ private String error;
+
+ public TetherOutputService(OutputCollector<TetherData,NullWritable> collector,
+ Reporter reporter) {
+ this.reporter = reporter;
+ this.collector = collector;
+ }
+
+ public synchronized void configure(int inputPort) {
+ TetherMapRunner.LOG.info("got input port from child");
+ this.inputPort = inputPort;
+ notify();
+ }
+
+ public synchronized int inputPort() throws InterruptedException {
+ while (inputPort == 0) {
+ TetherMapRunner.LOG.info("waiting for input port from child");
+ wait();
+ }
+ return inputPort;
+ }
+
+ public void output(ByteBuffer datum) {
+ try {
+ collector.collect(new TetherData(datum), NullWritable.get());
+ } catch (Throwable e) {
+ TetherMapRunner.LOG.warn("Error: "+e, e);
+ synchronized (this) {
+ error = e.toString();
+ }
+ }
+ }
+
+ public void outputPartitioned(int partition, ByteBuffer datum) {
+ TetherPartitioner.setNextPartition(partition);
+ output(datum);
+ }
+
+ public void status(Utf8 message) { reporter.setStatus(message.toString()); }
+
+
+ public void count(Utf8 group, Utf8 name, long amount) {
+ reporter.getCounter(group.toString(), name.toString()).increment(amount);
+ }
+
+ public synchronized void fail(Utf8 message) {
+ TetherMapRunner.LOG.warn("Failing: "+message);
+ error = message.toString();
+ notify();
+ }
+
+ public synchronized void complete() {
+ TetherMapRunner.LOG.info("got task complete");
+ complete = true;
+ notify();
+ }
+
+ public synchronized boolean isFinished() {
+ return complete || (error != null);
+ }
+
+ public String error() { return error; }
+
+ public synchronized boolean waitForFinish() throws InterruptedException {
+ while (!isFinished())
+ wait();
+ return error != null;
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+
+class TetherPartitioner implements Partitioner<TetherData, NullWritable> {
+
+ private static final ThreadLocal<Integer> CACHE = new ThreadLocal<Integer>();
+
+ private Schema schema;
+
+ public void configure(JobConf job) {
+ schema = AvroJob.getMapOutputSchema(job);
+ }
+
+ static void setNextPartition(int newValue) {
+ CACHE.set(newValue);
+ }
+
+ public int getPartition(TetherData key, NullWritable value,
+ int numPartitions) {
+ Integer result = CACHE.get();
+ if (result != null) // return cached value
+ return result;
+
+ ByteBuffer b = key.buffer();
+ int p = b.position();
+ int hashCode = BinaryData.hashCode(b.array(), p, b.limit()-p, schema);
+ if (hashCode < 0)
+ hashCode = -hashCode;
+ return hashCode % numPartitions;
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.FsInput;
+
+class TetherRecordReader
+ implements RecordReader<TetherData, NullWritable> {
+
+ private FsInput in;
+ private DataFileReader reader;
+ private long start;
+ private long end;
+
+ public TetherRecordReader(JobConf job, FileSplit split)
+ throws IOException {
+ this.in = new FsInput(split.getPath(), job);
+ this.reader =
+ new DataFileReader<Object>(in, new GenericDatumReader<Object>());
+
+ reader.sync(split.getStart()); // sync to start
+ this.start = in.tell();
+ this.end = split.getStart() + split.getLength();
+
+ job.set(AvroJob.INPUT_SCHEMA, reader.getSchema().toString());
+ }
+
+ public Schema getSchema() { return reader.getSchema(); }
+
+ public TetherData createKey() { return new TetherData(); }
+
+ public NullWritable createValue() { return NullWritable.get(); }
+
+ public boolean next(TetherData data, NullWritable ignore)
+ throws IOException {
+ if (!reader.hasNext() || reader.pastSync(end))
+ return false;
+ data.buffer(reader.nextBlock());
+ data.count((int)reader.getBlockCount());
+ return true;
+ }
+
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return in.tell();
+ }
+
+ public void close() throws IOException { reader.close(); }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.mapred.AvroJob;
+
+class TetherReducer
+ implements Reducer<TetherData,NullWritable,TetherData,NullWritable> {
+
+ private JobConf job;
+ private TetheredProcess process;
+ private boolean error;
+
+ public void configure(JobConf job) {
+ this.job = job;
+ }
+
+ public void reduce(TetherData datum, Iterator<NullWritable> ignore,
+ OutputCollector<TetherData, NullWritable> collector,
+ Reporter reporter) throws IOException {
+ try {
+ if (process == null) {
+ process = new TetheredProcess(job, collector, reporter);
+ process.inputClient.configure
+ (TaskType.REDUCE,
+ new Utf8(AvroJob.getMapOutputSchema(job).toString()),
+ new Utf8(AvroJob.getOutputSchema(job).toString()));
+ }
+ process.inputClient.input(datum.buffer(), datum.count());
+ } catch (IOException e) {
+ error = true;
+ throw e;
+ } catch (Exception e) {
+ error = true;
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Handle the end of the input by closing down the application.
+ */
+ public void close() throws IOException {
+ if (process == null) return;
+ try {
+ if (error)
+ process.inputClient.abort();
+ else
+ process.inputClient.complete();
+ process.outputService.waitForFinish();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ process.close();
+ }
+ }
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+
+class TetheredProcess {
+
+ private JobConf job;
+
+ TetherOutputService outputService;
+ Server outputServer;
+ Process subprocess;
+ Transceiver clientTransceiver;
+ InputProtocol inputClient;
+
+ public TetheredProcess(JobConf job,
+ OutputCollector<TetherData, NullWritable> collector,
+ Reporter reporter) throws Exception {
+ try {
+ // start server
+ this.outputService = new TetherOutputService(collector, reporter);
+ this.outputServer = new SocketServer
+ (new SpecificResponder(OutputProtocol.class, outputService),
+ new InetSocketAddress(0));
+
+ // start sub-process, connecting back to server
+ this.subprocess = startSubprocess(job);
+
+ // open client, connecting to sub-process
+ this.clientTransceiver =
+ new SocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+ this.inputClient = (InputProtocol)
+ SpecificRequestor.getClient(InputProtocol.class, clientTransceiver);
+
+
+ } catch (Exception t) {
+ close();
+ throw t;
+ }
+ }
+
+ public void close() {
+ if (clientTransceiver != null)
+ try {
+ clientTransceiver.close();
+ } catch (IOException e) {} // ignore
+ if (subprocess != null)
+ subprocess.destroy();
+ if (outputServer != null)
+ outputServer.close();
+ }
+
+ private Process startSubprocess(JobConf job)
+ throws IOException, InterruptedException {
+ // get the executable command
+ List<String> command = new ArrayList<String>();
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(job);
+ if (localFiles == null) { // until MAPREDUCE-476
+ URI[] files = DistributedCache.getCacheFiles(job);
+ localFiles = new Path[] { new Path(files[0].toString()) };
+ }
+ String executable = localFiles[0].toString();
+ FileUtil.chmod(executable, "a+x");
+ command.add(executable);
+
+ if (System.getProperty("hadoop.log.dir") == null
+ && System.getenv("HADOOP_LOG_DIR") != null)
+ System.setProperty("hadoop.log.dir", System.getenv("HADOOP_LOG_DIR"));
+
+ // wrap the command in a stdout/stderr capture
+ TaskAttemptID taskid = TaskAttemptID.forName(job.get("mapred.task.id"));
+ File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ long logLength = TaskLog.getTaskLogLength(job);
+ command = TaskLog.captureOutAndError(command, stdout, stderr, logLength);
+ stdout.getParentFile().mkdirs();
+ stderr.getParentFile().mkdirs();
+
+ // add output server's port to env
+ Map<String, String> env = new HashMap<String,String>();
+ env.put("AVRO_TETHER_OUTPUT_PORT",
+ Integer.toString(outputServer.getPort()));
+
+ // start child process
+ ProcessBuilder builder = new ProcessBuilder(command);
+ builder.environment().putAll(env);
+ return builder.start();
+ }
+
+}
Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html Tue Jun 15 18:21:18 2010
@@ -0,0 +1,32 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Run <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs over
+Avro data, with map and reduce functions run in a sub-process. This
+permits MapReduce programs over Avro data in languages besides Java.
+<p>
+Each language will provide a framework to permit easy implementation
+of MapReduce programs in that language. Currently only a Java
+framework has been implemented, for test purposes, so this feature is
+not yet useful.
+<p>
+This is still an experimental API, subject to change.
+</body>
+</html>
Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java Tue Jun 15 18:21:18 2010
@@ -89,6 +89,7 @@ public class SpecificResponder extends R
for (Schema.Field param: message.getRequest().getFields())
paramTypes[i++] = data.getClass(param.schema());
Method method = impl.getClass().getMethod(message.getName(), paramTypes);
+ method.setAccessible(true);
return method.invoke(impl, (Object[])request);
} catch (InvocationTargetException e) {
throw (Exception)e.getTargetException();
Modified: avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java Tue Jun 15 18:21:18 2010
@@ -25,6 +25,7 @@ import java.io.InputStream;
import org.apache.avro.reflect.InduceSchemaTool;
import org.apache.avro.specific.SpecificCompiler.SpecificCompilerTool;
+import org.apache.avro.mapred.tether.TetherJob;
/** Command-line driver.*/
public class Main {
@@ -47,7 +48,8 @@ public class Main {
new DataFileGetSchemaTool(),
new GenAvroTool(),
new RpcReceiveTool(),
- new RpcSendTool()
+ new RpcSendTool(),
+ new TetherJob()
}) {
Tool prev = tools.put(tool.getName(), tool);
if (prev != null) {
Modified: avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java Tue Jun 15 18:21:18 2010
@@ -80,9 +80,9 @@ public class Utf8 implements Comparable<
}
public int hashCode() {
- int hash = length;
+ int hash = 0;
for (int i = 0; i < this.length; i++)
- hash += bytes[i] & 0xFF;
+ hash = hash*31 + bytes[i];
return hash;
}
Modified: avro/trunk/lang/java/src/test/bin/test_tools.sh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/bin/test_tools.sh?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/bin/test_tools.sh (original)
+++ avro/trunk/lang/java/src/test/bin/test_tools.sh Tue Jun 15 18:21:18 2010
@@ -80,6 +80,12 @@ $CMD tojson $TMPDIR/data_file_write.avro
$CMD getschema $TMPDIR/data_file_write.avro \
| cmp -s - <(echo '"string"')
######################################################################
+# Test tethered mapred
+$CMD tether --in build/test/mapred/in --out build/test/mapred/tout --outschema ../../share/test/schemas/WordCount.avsc --program build/test/wordcount.jar
+$CMD tojson build/test/mapred/tout/part-00000.avro \
+ | cmp -s - <($CMD tojson build/test/mapred/out/part-00000.avro)
+
+######################################################################
$CMD 2>&1 | grep -q "Available tools:"
$CMD doesnotexist 2>&1 | grep -q "Available tools:"
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java Tue Jun 15 18:21:18 2010
@@ -54,8 +54,10 @@ public class TestCompare {
@Test
public void testString() throws Exception {
+ check("\"string\"", new Utf8(""), new Utf8("a"));
check("\"string\"", new Utf8("a"), new Utf8("b"));
check("\"string\"", new Utf8("a"), new Utf8("ab"));
+ check("\"string\"", new Utf8("ab"), new Utf8("b"));
}
@Test
@@ -219,6 +221,21 @@ public class TestCompare {
assert(!o2.equals(null));
assert(o1.hashCode() != o2.hashCode());
+
+ // check BinaryData.hashCode against Object.hashCode
+ if (schema.getType() != Schema.Type.ENUM) {
+ assertEquals(o1.hashCode(),
+ BinaryData.hashCode(b1, 0, b1.length, schema));
+ assertEquals(o2.hashCode(),
+ BinaryData.hashCode(b2, 0, b2.length, schema));
+ }
+
+ // check BinaryData.hashCode against GenericData.hashCode
+ assertEquals(comparator.hashCode(o1, schema),
+ BinaryData.hashCode(b1, 0, b1.length, schema));
+ assertEquals(comparator.hashCode(o2, schema),
+ BinaryData.hashCode(b2, 0, b2.length, schema));
+
}
@SuppressWarnings(value="unchecked")
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java Tue Jun 15 18:21:18 2010
@@ -79,29 +79,25 @@ public class TestWordCountSpecific {
String dir = System.getProperty("test.dir", ".") + "/mapred";
Path outputPath = new Path(dir + "/out");
- try {
- WordCountUtil.writeLinesFile();
-
- job.setJobName("wordcount");
-
- AvroJob.setInputSpecific(job, Schema.create(Schema.Type.STRING));
- AvroJob.setOutputSpecific(job, WordCount.SCHEMA$);
-
- job.setMapperClass(MapImpl.class);
- job.setCombinerClass(ReduceImpl.class);
- job.setReducerClass(ReduceImpl.class);
-
- FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
- FileOutputFormat.setOutputPath(job, outputPath);
- FileOutputFormat.setCompressOutput(job, true);
-
- JobClient.runJob(job);
-
- WordCountUtil.validateCountsFile();
- } finally {
- outputPath.getFileSystem(job).delete(outputPath);
- }
-
+ outputPath.getFileSystem(job).delete(outputPath);
+ WordCountUtil.writeLinesFile();
+
+ job.setJobName("wordcount");
+
+ AvroJob.setInputSpecific(job, Schema.create(Schema.Type.STRING));
+ AvroJob.setOutputSpecific(job, WordCount.SCHEMA$);
+
+ job.setMapperClass(MapImpl.class);
+ job.setCombinerClass(ReduceImpl.class);
+ job.setReducerClass(ReduceImpl.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, outputPath);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ JobClient.runJob(job);
+
+ WordCountUtil.validateCountsFile();
}
}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRequestor;
+
+/** Base class for Java tether mapreduce programs. Useless except for testing,
+ * since it's already possible to write Java MapReduce programs without
+ * tethering. Also serves as an example of how a framework may be
+ * implemented. */
+public abstract class TetherTask<IN,MID,OUT> {
+ static final Logger LOG = LoggerFactory.getLogger(TetherTask.class);
+
+ private Transceiver clientTransceiver;
+ private OutputProtocol outputClient;
+
+ private TaskType taskType;
+ private int partitions;
+
+ private DecoderFactory decoderFactory = DecoderFactory.defaultFactory();
+ private BinaryDecoder decoder;
+
+ private SpecificDatumReader<IN> inReader;
+ private SpecificDatumReader<MID> midReader;
+ private IN inRecord;
+ private MID midRecord;
+ private MID midRecordSpare;
+ private Collector<MID> midCollector;
+ private Collector<OUT> outCollector;
+
+ private static class Buffer extends ByteArrayOutputStream {
+ public ByteBuffer data() {
+ return ByteBuffer.wrap(buf, 0, count);
+ }
+ }
+
+ /** Collector for map and reduce output values. */
+ public class Collector<T> {
+ private SpecificDatumWriter<T> writer;
+ private Buffer buffer = new Buffer();
+ private BinaryEncoder encoder = new BinaryEncoder(buffer);
+
+ private Collector(Schema schema) {
+ this.writer = new SpecificDatumWriter<T>(schema);
+ }
+
+ /** Collect a map or reduce output value. */
+ public void collect(T record) throws IOException {
+ buffer.reset();
+ writer.write(record, encoder);
+ outputClient.output(buffer.data());
+ }
+
+ /** Collect a pre-partitioned map output value. */
+ public void collect(T record, int partition) throws IOException {
+ buffer.reset();
+ writer.write(record, encoder);
+ outputClient.outputPartitioned(partition, buffer.data());
+ }
+ }
+
+ void open(int inputPort) throws IOException {
+ // open output client, connecting to parent
+ String clientPortString = System.getenv("AVRO_TETHER_OUTPUT_PORT");
+ if (clientPortString == null)
+ throw new RuntimeException("AVRO_TETHER_OUTPUT_PORT env var is null");
+ int clientPort = Integer.parseInt(clientPortString);
+ this.clientTransceiver =
+ new SocketTransceiver(new InetSocketAddress(clientPort));
+ this.outputClient = (OutputProtocol)
+ SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+
+ // send inputPort to parent
+ outputClient.configure(inputPort);
+ }
+
+ void configure(TaskType taskType, Utf8 inSchemaText, Utf8 outSchemaText) {
+ this.taskType = taskType;
+ try {
+ Schema inSchema = Schema.parse(inSchemaText.toString());
+ Schema outSchema = Schema.parse(outSchemaText.toString());
+ switch (taskType) {
+ case MAP:
+ this.inReader = new SpecificDatumReader<IN>(inSchema);
+ this.midCollector = new Collector<MID>(outSchema);
+ break;
+ case REDUCE:
+ this.midReader = new SpecificDatumReader<MID>(inSchema);
+ this.outCollector = new Collector<OUT>(outSchema);
+ break;
+ }
+ } catch (Throwable e) {
+ fail(e.toString());
+ }
+ }
+
+ void partitions(int partitions) { this.partitions = partitions; }
+
+ /** Return the number of map output partitions of this job. */
+ public int partitions() { return partitions; }
+
+ void input(ByteBuffer data, long count) {
+ try {
+ decoder = decoderFactory.createBinaryDecoder(data.array(), decoder);
+ for (long i = 0; i < count; i++) {
+ switch (taskType) {
+ case MAP:
+ inRecord = inReader.read(inRecord, decoder);
+ map(inRecord, midCollector);
+ break;
+ case REDUCE:
+ MID prev = midRecord;
+ midRecord = midReader.read(midRecordSpare, decoder);
+ if (prev != null && !midRecord.equals(prev))
+ reduceFlush(prev, outCollector);
+ reduce(midRecord, outCollector);
+ midRecordSpare = prev;
+ break;
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("failing: "+e, e);
+ fail(e.toString());
+ }
+ }
+
+ void complete() {
+ if (taskType == TaskType.REDUCE && midRecord != null)
+ try {
+ reduceFlush(midRecord, outCollector);
+ } catch (Throwable e) {
+ LOG.warn("failing: "+e, e);
+ fail(e.toString());
+ }
+ outputClient.complete();
+ }
+
+ /** Called with input values to generate intermediate values. */
+ public abstract void map(IN record, Collector<MID> collector)
+ throws IOException;
+ /** Called with sorted intermediate values. */
+ public abstract void reduce(MID record, Collector<OUT> collector)
+ throws IOException;
+ /** Called with the last intermediate value in each equivalence run. */
+ public abstract void reduceFlush(MID record, Collector<OUT> collector)
+ throws IOException;
+
+ /** Call to update task status. */
+ public void status(String message) {
+ outputClient.status(new Utf8(message));
+ }
+
+ /** Call to increment a counter. */
+ public void count(String group, String name, long amount) {
+ outputClient.count(new Utf8(group), new Utf8(name), amount);
+ }
+
+ /** Call to fail the task. */
+ public void fail(String message) {
+ outputClient.fail(new Utf8(message));
+ close();
+ }
+
+ void close() {
+ if (clientTransceiver != null)
+ try {
+ clientTransceiver.close();
+ } catch (IOException e) {} // ignore
+ }
+
+}
Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.specific.SpecificResponder;
+
+/** Java implementation of a tether executable. Useless except for testing,
+ * since it's already possible to write Java MapReduce programs without
+ * tethering. Also serves as an example of how a framework may be
+ * implemented. */
+public class TetherTaskRunner implements InputProtocol {
+ static final Logger LOG = LoggerFactory.getLogger(TetherTaskRunner.class);
+
+ private SocketServer inputServer;
+ private TetherTask task;
+
+ public TetherTaskRunner(TetherTask task) throws IOException {
+ this.task = task;
+
+ // start input server
+ this.inputServer = new SocketServer
+ (new SpecificResponder(InputProtocol.class, this),
+ new InetSocketAddress(0));
+
+ // open output to parent
+ task.open(inputServer.getPort());
+ }
+
+ @Override public void configure(TaskType taskType,
+ Utf8 inSchema,
+ Utf8 outSchema) {
+ LOG.info("got configure");
+ task.configure(taskType, inSchema, outSchema);
+ }
+
+ @Override public synchronized void input(ByteBuffer data, long count) {
+ task.input(data, count);
+ }
+
+ @Override public void partitions(int partitions) {
+ task.partitions(partitions);
+ }
+
+ @Override public void abort() {
+ LOG.info("got abort");
+ close();
+ }
+
+ @Override public synchronized void complete() {
+ LOG.info("got input complete");
+ task.complete();
+ close();
+ }
+
+ /** Wait for task to complete. */
+ public void join() throws InterruptedException {
+ inputServer.join();
+ }
+
+ private void close() {
+ task.close();
+ if (inputServer != null)
+ inputServer.close();
+ }
+}