You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/06/15 20:21:19 UTC

svn commit: r954998 [1/2] - in /avro/trunk: ./ lang/java/ lang/java/src/java/org/apache/avro/file/ lang/java/src/java/org/apache/avro/generic/ lang/java/src/java/org/apache/avro/io/ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/java/org/apache/...

Author: cutting
Date: Tue Jun 15 18:21:18 2010
New Revision: 954998

URL: http://svn.apache.org/viewvc?rev=954998&view=rev
Log:
AVRO-512. Java: Define and implement MapReduce connector protocols.

Added:
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
    avro/trunk/share/schemas/org/apache/avro/mapred/
    avro/trunk/share/schemas/org/apache/avro/mapred/tether/
    avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr
    avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/build.xml
    avro/trunk/lang/java/ivy.xml
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
    avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
    avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
    avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java
    avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java
    avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java
    avro/trunk/lang/java/src/test/bin/test_tools.sh
    avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jun 15 18:21:18 2010
@@ -10,6 +10,9 @@ Avro 1.4.0 (unreleased)
 
     AVRO-285: Specify one-way messages and implement in Java. (cutting)
 
+    AVRO-512. Java: Define and implement MapReduce connector
+    protocols. (cutting)
+
   IMPROVEMENTS
 
     AVRO-501. missing function in C api to access array elements after 

Modified: avro/trunk/lang/java/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/build.xml?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/build.xml (original)
+++ avro/trunk/lang/java/build.xml Tue Jun 15 18:21:18 2010
@@ -151,7 +151,8 @@
 
   <target name="compile" depends="javacc,ivy-retrieve">
     <java-compiler
-       excludes="**/ipc/** **/*Requestor.java **/*Responder.java **/tool/**">
+       excludes="**/ipc/** **/*Requestor.java **/*Responder.java
+       **/tool/** **/mapred/**">
       <src path="${build.dir}/src"/>
       <src path="${java.src.dir}"/>
     </java-compiler>
@@ -415,6 +416,24 @@
     <test-runner files.location="${test.java.classes}" tests.pattern="**/TestProtocolSpecific$InteropTest.class" />
   </target>
 
+  <target name="tether-wordcount-jar" depends="compile-test-java"
+	  description="Build tether wordcount jar file">
+    <jar jarfile="${test.java.build.dir}/wordcount.jar">
+      <manifest>
+        <attribute name="Main-Class"
+		   value="org.apache.avro.mapred.tether.WordCountTask"/>
+      </manifest>
+      <fileset dir="${build.classes}" />
+      <zipgroupfileset dir="${ivy.lib}" includes="jackson*.jar"/>
+      <zipgroupfileset dir="${ivy.test.lib}" includes="slf4j*.jar"/>
+      <fileset dir="${test.java.classes}"
+	       includes="org/apache/avro/mapred/tether/*"/>
+      <fileset dir="${test.java.generated.classes}"
+	       includes="org/apache/avro/mapred/*"/>
+    </jar>
+    <chmod file="${test.java.build.dir}/wordcount.jar" perm="ugo+x"/>
+  </target>
+
   <target name="tools" depends="compile,ivy-retrieve-tools"
 	  description="Build standalone tools jar file">
     <jar jarfile="${build.dir}/avro-tools-${version}.jar">
@@ -430,13 +449,14 @@
     <chmod file="${build.dir}/avro-tools-${version}.jar" perm="ugo+x"/>
   </target>
 
-  <target name="test-tools" depends="tools,compile-test-java"
+  <target name="test-tools" depends="tools,unit-test-java,tether-wordcount-jar"
    description="Tests tools">
     <exec executable="${basedir}/src/test/bin/test_tools.sh"
           failonerror="true">
       <env key="TOOLS" value="${build.dir}/avro-tools-${version}.jar"/>
       <env key="TMPDIR" value="${test.java.build.dir}/tools"/>
       <env key="JAVA_HOME" value="${java.home}"/>
+      <env key="HADOOP_LOG_DIR" value="${build.dir}/logs"/>
     </exec>
   </target>
 

Modified: avro/trunk/lang/java/ivy.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ivy.xml?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/ivy.xml (original)
+++ avro/trunk/lang/java/ivy.xml Tue Jun 15 18:21:18 2010
@@ -18,10 +18,10 @@
 <ivy-module version="2.0"
             xmlns:e="http://ant.apache.org/ivy/extra">
 
-  <info organisation="org.apache.hadoop"
+  <info organisation="org.apache.avro"
     module="${name}" revision="${version}">
     <license name="Apache 2.0"/>
-    <ivyauthor name="Apache Hadoop" url="http://hadoop.apache.org"/>
+    <ivyauthor name="Apache Avro" url="http://avro.apache.org"/>
     <description>Avro</description>
   </info>
 
@@ -62,9 +62,9 @@
     <dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"
         conf="build->default;test->default;tools->default"/>
     <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"
-                conf="build->default" transitive="false"/>
+                conf="build->default;tools->default" transitive="false"/>
     <dependency org="commons-httpclient" name="commons-httpclient" rev="3.0.1"
-		conf="test->default"/>
+		conf="test->default;tools->default"/>
   </dependencies>
 
 </ivy-module>

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Tue Jun 15 18:21:18 2010
@@ -54,6 +54,8 @@ public class DataFileStream<D> implement
 
   Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
+  ByteBuffer blockBuffer;
+  long blockCount;                              // # entries in block
   long blockRemaining;                          // # entries remaining in block
   byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
   byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
@@ -159,7 +161,7 @@ public class DataFileStream<D> implement
         }
         if (hasNextBlock()) {
           block = nextBlock(block);
-          ByteBuffer blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
+          blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
           blockBuffer = codec.decompress(blockBuffer);
           datumIn = DecoderFactory.defaultFactory().createBinaryDecoder(
               blockBuffer.array(), blockBuffer.arrayOffset() +
@@ -199,6 +201,20 @@ public class DataFileStream<D> implement
     return result;
   }
 
+  /** Expert: Return the next block in the file, as binary-encoded data. */
+  public ByteBuffer nextBlock() throws IOException {
+    if (!hasNext())
+      throw new NoSuchElementException();
+    if (blockRemaining != blockCount)
+      throw new IllegalStateException("Not at block start.");
+    blockRemaining = 0;
+    datumIn = null;
+    return blockBuffer;
+  }
+
+  /** Expert: Return the count of items in the current block. */
+  public long getBlockCount() { return blockCount; }
+
   protected void blockFinished() throws IOException {
     // nothing for the stream impl
   }
@@ -214,6 +230,7 @@ public class DataFileStream<D> implement
         throw new IOException("Block size invalid or too large for this " +
           "implementation: " + blockSize);
       }
+      blockCount = blockRemaining;
       availableBlock = true;
       return true;
     } catch (EOFException eof) {

Modified: avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Tue Jun 15 18:21:18 2010
@@ -248,6 +248,18 @@ public class DataFileWriter<D> implement
       writeBlock();
   }
 
+  /** Expert: Append a pre-encoded datum to the file.  No validation is
+   * performed to check that the encoding conforms to the file's schema.
+   * Appending non-conforming data may result in an unreadable file. */
+  public void appendEncoded(ByteBuffer datum) throws IOException {
+    assertOpen();
+    int start = datum.position();
+    buffer.write(datum.array(), start, datum.limit()-start);
+    blockCount++;
+    if (buffer.size() >= syncInterval)
+      writeBlock();
+  }
+
   /**
    * Appends data from another file.  otherFile must have the same schema.
    * Data blocks will be copied without de-serializing data.  If the codecs

Modified: avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/generic/GenericData.java Tue Jun 15 18:21:18 2010
@@ -441,6 +441,8 @@ public class GenericData {
       return hashCode;
     case UNION:
       return hashCode(o, s.getTypes().get(resolveUnion(s, o)));
+    case ENUM:
+      return s.getEnumOrdinal(o.toString());
     case NULL:
       return 0;
     default:

Modified: avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/io/BinaryData.java Tue Jun 15 18:21:18 2010
@@ -184,4 +184,111 @@ public class BinaryData {
     return l1 - l2;
   }
 
+  private static class HashData {
+    private final BufferAccessor bytes;
+    private final BinaryDecoder decoder;
+    public HashData() {
+      this.decoder = new BinaryDecoder(new byte[0], 0, 0);
+      this.bytes = decoder.getBufferAccessor();
+    }
+    public void set(byte[] bytes, int start, int len) {
+      this.decoder.init(bytes, start, len);
+    }
+  }
+
+  private static final ThreadLocal<HashData> HASH_DATA
+    = new ThreadLocal<HashData>() {
+    @Override protected HashData initialValue() { return new HashData(); }
+  };
+
+  /** Hash binary encoded data. Consistent with {@link
+   * org.apache.avro.generic.GenericData#hashCode(Object, Schema)}.*/
+  public static int hashCode(byte[] bytes, int start, int length,
+                             Schema schema) {
+    HashData data = HASH_DATA.get();
+    data.set(bytes, start, length);
+    try {
+      return hashCode(data, schema);
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  private static int hashCode(HashData data, Schema schema)
+    throws IOException {
+    Decoder decoder = data.decoder;
+    switch (schema.getType()) {
+    case RECORD: {
+      int hashCode = 1;
+      for (Field field : schema.getFields()) {
+        if (field.order() == Field.Order.IGNORE) {
+          GenericDatumReader.skip(field.schema(), decoder);
+          continue;
+        }
+        hashCode = hashCode*31 + hashCode(data, field.schema());
+      }
+      return hashCode;
+    }
+    case ENUM: case INT:
+      return decoder.readInt();
+    case FLOAT:
+      return Float.floatToIntBits(decoder.readFloat());
+    case LONG: {
+      long l = decoder.readLong();
+      return (int)(l^(l>>>32));
+    }
+    case DOUBLE: {
+      long l = Double.doubleToLongBits(decoder.readDouble());
+      return (int)(l^(l>>>32));
+    }
+    case ARRAY: {
+      Schema elementType = schema.getElementType();
+      int hashCode = 1;
+      for (long l = decoder.readArrayStart(); l != 0; l = decoder.arrayNext())
+        for (long i = 0; i < l; i++)
+          hashCode = hashCode*31 + hashCode(data, elementType);
+      return hashCode;
+    }
+    case MAP:
+      throw new AvroRuntimeException("Can't hashCode maps!");
+    case UNION:
+      return hashCode(data, schema.getTypes().get(decoder.readInt()));
+    case FIXED:
+      return hashBytes(1, data, schema.getFixedSize(), false);
+    case STRING:
+      return hashBytes(0, data, decoder.readInt(), false);
+    case BYTES:
+      return hashBytes(1, data, decoder.readInt(), true);
+    case BOOLEAN:
+      return decoder.readBoolean() ? 1231 : 1237;
+    case NULL:
+      return 0;
+    default:
+      throw new AvroRuntimeException("Unexpected schema to hashCode!");
+    }
+  }
+
+  private static int hashBytes(int init, HashData data, int len, boolean rev)
+    throws IOException {
+    int hashCode = init;
+    byte[] bytes = data.bytes.getBuf();
+    int start = data.bytes.getPos();
+    int end = start+len;
+    if (rev) 
+      for (int i = end-1; i >= start; i--)
+        hashCode = hashCode*31 + bytes[i];
+    else
+      for (int i = start; i < end; i++)
+        hashCode = hashCode*31 + bytes[i];
+    data.decoder.skipFixed(len);
+    return hashCode;
+  }
+
+  /** Skip a binary-encoded long, returning the position after it. */
+  public static int skipLong(byte[] bytes, int start) {
+    int i = start;
+    for (int b = bytes[i++]; ((b & 0x80) != 0); b = bytes[i++]) {}
+    return i;
+  }
+
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java Tue Jun 15 18:21:18 2010
@@ -76,16 +76,14 @@ public class ByteBufferOutputStream exte
     buffer.put(b, off, len);
   }
 
-  /** Add a buffer to the output without copying, if possible.
-   * Sets buffer's position to its limit.
-   */
+  /** Add a buffer to the output without copying, if possible. */
   public void writeBuffer(ByteBuffer buffer) throws IOException {
     if (buffer.remaining() < BUFFER_SIZE) {
       write(buffer.array(), buffer.position(), buffer.remaining());
-    } else {
-      buffers.add(buffer);                        // append w/o copying
+    } else {                                      // append w/o copying bytes
+      ByteBuffer dup = buffer.duplicate();
+      dup.position(buffer.limit());               // ready for flip
+      buffers.add(dup);
     }
-    buffer.position(buffer.limit());              // mark data as consumed
   }
 }
-

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroJob.java Tue Jun 15 18:21:18 2010
@@ -21,6 +21,7 @@ package org.apache.avro.mapred;
 import java.util.Collection;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.avro.Schema;
 
@@ -32,13 +33,15 @@ public class AvroJob {
   static final String API_SPECIFIC = "specific";
 
   static final String INPUT_API = "avro.input.api";
-  static final String INPUT_SCHEMA = "avro.input.schema";
-
-  static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+  static final String OUTPUT_API = "avro.output.api";
   static final String MAP_OUTPUT_API = "avro.map.output.api";
 
-  static final String OUTPUT_SCHEMA = "avro.output.schema";
-  static final String OUTPUT_API = "avro.output.api";
+  /** The configuration key for a job's input schema. */
+  public static final String INPUT_SCHEMA = "avro.input.schema";
+  /** The configuration key for a job's intermediate schema. */
+  public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
+  /** The configuration key for a job's output schema. */
+  public static final String OUTPUT_SCHEMA = "avro.output.schema";
 
   /** Configure a job's map input to use Avro's generic API. */
   public static void setInputGeneric(JobConf job, Schema s) {
@@ -59,32 +62,53 @@ public class AvroJob {
 
   /** Configure a job's map output key schema using Avro's generic API. */
   public static void setMapOutputGeneric(JobConf job, Schema s) {
-    job.set(MAP_OUTPUT_SCHEMA, s.toString());
     job.set(MAP_OUTPUT_API, API_GENERIC);
+    setMapOutputSchema(job, s);
     configureAvroOutput(job);
   }
 
   /** Configure a job's map output key schema using Avro's specific API. */
   public static void setMapOutputSpecific(JobConf job, Schema s) {
-    job.set(MAP_OUTPUT_SCHEMA, s.toString());
     job.set(MAP_OUTPUT_API, API_SPECIFIC);
+    setMapOutputSchema(job, s);
     configureAvroOutput(job);
   }
 
   /** Configure a job's output key schema using Avro's generic API. */
   public static void setOutputGeneric(JobConf job, Schema s) {
-    job.set(OUTPUT_SCHEMA, s.toString());
     job.set(OUTPUT_API, API_GENERIC);
+    setOutputSchema(job, s);
     configureAvroOutput(job);
   }
 
   /** Configure a job's output key schema using Avro's specific API. */
   public static void setOutputSpecific(JobConf job, Schema s) {
-    job.set(OUTPUT_SCHEMA, s.toString());
     job.set(OUTPUT_API, API_SPECIFIC);
+    setOutputSchema(job, s);
     configureAvroOutput(job);
   }
 
+  /** Set a job's map output key schema. */
+  public static void setMapOutputSchema(JobConf job, Schema s) {
+    job.set(MAP_OUTPUT_SCHEMA, s.toString());
+  }
+
+  /** Return a job's map output key schema. */
+  public static Schema getMapOutputSchema(Configuration job) {
+    return Schema.parse(job.get(AvroJob.MAP_OUTPUT_SCHEMA,
+                                job.get(AvroJob.OUTPUT_SCHEMA)));
+  }
+
+  /** Set a job's output key schema. */
+  public static void setOutputSchema(JobConf job, Schema s) {
+    job.set(OUTPUT_SCHEMA, s.toString());
+  }
+
+  /** Return a job's output key schema. */
+  public static Schema getOutputSchema(Configuration job) {
+    return Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+  }
+
   private static void configureAvroOutput(JobConf job) {
     job.setOutputKeyClass(AvroWrapper.class);
     job.setOutputKeyComparatorClass(AvroKeyComparator.class);

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeyComparator.java Tue Jun 15 18:21:18 2010
@@ -38,8 +38,7 @@ public class AvroKeyComparator<T>
   public void setConf(Configuration conf) {
     super.setConf(conf);
     if (conf != null) {
-      schema = Schema.parse(conf.get(AvroJob.MAP_OUTPUT_SCHEMA,
-                                     conf.get(AvroJob.OUTPUT_SCHEMA)));
+      schema = AvroJob.getMapOutputSchema(conf);
       String api = getConf().get(AvroJob.MAP_OUTPUT_API,
                                  getConf().get(AvroJob.OUTPUT_API));
       model = AvroJob.API_SPECIFIC.equals(api)

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroKeySerialization.java Tue Jun 15 18:21:18 2010
@@ -52,10 +52,7 @@ public class AvroKeySerialization<T> ext
     //  We need not rely on mapred.task.is.map here to determine whether map
     //  output or final output is desired, since the mapreduce framework never
     //  creates a deserializer for final output, only for map output.
-    String json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA,
-                                getConf().get(AvroJob.OUTPUT_SCHEMA));
-    Schema schema = Schema.parse(json);
-
+    Schema schema = AvroJob.getMapOutputSchema(getConf());
     String api = getConf().get(AvroJob.MAP_OUTPUT_API,
                                getConf().get(AvroJob.OUTPUT_API));
     DatumReader<T> reader = AvroJob.API_SPECIFIC.equals(api)
@@ -105,14 +102,13 @@ public class AvroKeySerialization<T> ext
     // or final output is needed.
     boolean isMap = getConf().getBoolean("mapred.task.is.map", false);
 
-    String json = getConf().get(AvroJob.OUTPUT_SCHEMA);
-    if (isMap) 
-      json = getConf().get(AvroJob.MAP_OUTPUT_SCHEMA, json);
-    Schema schema = Schema.parse(json);
+    Schema schema = isMap
+      ? AvroJob.getMapOutputSchema(getConf())
+      : AvroJob.getOutputSchema(getConf());
 
     String api = getConf().get(AvroJob.OUTPUT_API);
     if (isMap) 
-      api = getConf().get(AvroJob.MAP_OUTPUT_API, json);
+      api = getConf().get(AvroJob.MAP_OUTPUT_API, api);
 
     DatumWriter<T> writer = AvroJob.API_SPECIFIC.equals(api)
       ? new SpecificDatumWriter<T>(schema)

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroOutputFormat.java Tue Jun 15 18:21:18 2010
@@ -40,10 +40,14 @@ import org.apache.avro.file.CodecFactory
 public class AvroOutputFormat <T>
   extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
 
-  final static String EXT = ".avro";
+  /** The file name extension for avro data files. */
+  public final static String EXT = ".avro";
 
-  private static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
-  private static final int DEFAULT_DEFLATE_LEVEL = 1;
+  /** The configuration key for Avro deflate level. */
+  public static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
+
+  /** The default deflate level. */
+  public static final int DEFAULT_DEFLATE_LEVEL = 1;
 
   /** Enable output compression using the deflate codec and specify its level.*/
   public static void setDeflateLevel(JobConf job, int level) {
@@ -56,7 +60,7 @@ public class AvroOutputFormat <T>
                     String name, Progressable prog)
     throws IOException {
 
-    Schema schema = Schema.parse(job.get(AvroJob.OUTPUT_SCHEMA));
+    Schema schema = AvroJob.getOutputSchema(job);
 
     DatumWriter<T> datumWriter =
       AvroJob.API_SPECIFIC.equals(job.get(AvroJob.OUTPUT_API))

Modified: avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/package.html Tue Jun 15 18:21:18 2010
@@ -18,8 +18,8 @@
 -->
 
 <body>
-Tools to permit using Avro data
-with <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs.
+Run <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs over
+Avro data, with map and reduce functions written in Java.
 
 <p>Avro data files do not contain key/value pairs as expected by
   Hadoop's MapReduce API, but rather just a sequence of values.  Thus

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherData.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+/** A wrapper for a ByteBuffer containing binary-encoded data. */
+class TetherData {
+  private int count = 1;                          // only used for task input
+  private ByteBuffer buffer;
+
+  public TetherData() {}
+  public TetherData(ByteBuffer buffer) { this.buffer = buffer; }
+
+  /** Return the count of records in the buffer.  Used for task input only.*/
+  public int count() { return count; }
+
+  /** Set the count of records in the buffer.  Used for task input only. */
+  public void count(int count) { this.count = count; }
+    
+  /** Return the buffer. */
+  public ByteBuffer buffer() { return buffer; }
+
+  /** Set the buffer. */
+  public void buffer(ByteBuffer buffer) { this.buffer = buffer; }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherInputFormat.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.avro.mapred.AvroOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.InputFormat} for tethered Avro input. */
+class TetherInputFormat
+  extends FileInputFormat<TetherData, NullWritable> {
+
+  @Override
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : super.listStatus(job))
+      if (file.getPath().getName().endsWith(AvroOutputFormat.EXT))
+        result.add(file);
+    return result.toArray(new FileStatus[0]);
+  }
+
+  @Override
+  public RecordReader<TetherData, NullWritable>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
+    reporter.setStatus(split.toString());
+    return new TetherRecordReader(job, (FileSplit)split);
+  }
+
+}
+

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherJob.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.File;
+import java.util.List;
+import java.util.Collection;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+
+import org.apache.avro.Schema;
+import org.apache.avro.tool.Tool;
+import org.apache.avro.mapred.AvroJob;
+
+/** Constructs and submits tether jobs. This may either be used as a
+ * commandline-based or API-based method to launch tether jobs. */
+public class TetherJob extends Configured implements Tool {
+
+  /** Get the URI of the application's executable. */
+  public static URI getExecutable(JobConf job) {
+    try {
+      return new URI(job.get("avro.tether.executable"));
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  /** Set the URI for the application's executable. Normally this in HDFS. */
+  public static void setExecutable(JobConf job, URI executable) {
+    job.set("avro.tether.executable", executable.toString());
+  }
+
+  /** Submit a job to the map/reduce cluster. All of the necessary
+   * modifications to the job to run under tether are made to the
+   * configuration.
+   */
+  public static RunningJob runJob(JobConf job) throws IOException {
+    setupTetherJob(job);
+    return JobClient.runJob(job);
+  }
+
+  /** Submit a job to the Map-Reduce framework. */
+  public static RunningJob submitJob(JobConf conf) throws IOException {
+    setupTetherJob(conf);
+    return new JobClient(conf).submitJob(conf);
+  }
+  
+  private static void setupTetherJob(JobConf job) throws IOException {
+    job.setMapRunnerClass(TetherMapRunner.class);
+    job.setPartitionerClass(TetherPartitioner.class);
+    job.setReducerClass(TetherReducer.class);
+
+    job.setInputFormat(TetherInputFormat.class);
+    job.setOutputFormat(TetherOutputFormat.class);
+
+    job.setOutputKeyClass(TetherData.class);
+    job.setOutputKeyComparatorClass(TetherKeyComparator.class);
+    job.setMapOutputValueClass(NullWritable.class);
+
+    // add TetherKeySerialization to io.serializations
+    Collection<String> serializations =
+      job.getStringCollection("io.serializations");
+    if (!serializations.contains(TetherKeySerialization.class.getName())) {
+      serializations.add(TetherKeySerialization.class.getName());
+      job.setStrings("io.serializations",
+                     serializations.toArray(new String[0]));
+    }
+    
+    DistributedCache.addCacheFile(getExecutable(job), job);
+  }
+
+  // Tool methods
+
+  @Override
+  public String getName() { return "tether"; }
+
+  @Override
+  public String getShortDescription() {return "Run a tethered mapreduce job.";}
+
+  @Override
+  public int run(InputStream ins, PrintStream outs, PrintStream err,
+                 List<String> args) throws Exception {
+
+    OptionParser p = new OptionParser();
+    OptionSpec<URI> exec =
+      p.accepts("program", "executable program, usually in HDFS")
+      .withRequiredArg().ofType(URI.class);
+    OptionSpec<String> in = p.accepts("in", "comma-separated input paths")
+      .withRequiredArg().ofType(String.class);
+    OptionSpec<Path> out = p.accepts("out", "output directory")
+      .withRequiredArg().ofType(Path.class);
+    OptionSpec<File> outSchema = p.accepts("outschema", "output schema file")
+      .withRequiredArg().ofType(File.class);
+    OptionSpec<File> mapOutSchema =
+      p.accepts("outschemamap", "map output schema file, if different")
+      .withOptionalArg().ofType(File.class);
+    OptionSpec<Integer> reduces = p.accepts("reduces", "number of reduces")
+      .withOptionalArg().ofType(Integer.class);
+
+    JobConf job = new JobConf();
+      
+    try {
+      OptionSet opts = p.parse(args.toArray(new String[0]));
+      FileInputFormat.addInputPaths(job, in.value(opts));
+      FileOutputFormat.setOutputPath(job, out.value(opts));
+      TetherJob.setExecutable(job, exec.value(opts));
+      AvroJob.setOutputSchema(job, Schema.parse(outSchema.value(opts)));
+      if (opts.hasArgument(mapOutSchema))
+        AvroJob.setMapOutputSchema(job, Schema.parse(mapOutSchema.value(opts)));
+      if (opts.hasArgument(reduces))
+        job.setNumReduceTasks(reduces.value(opts));
+    } catch (Exception e) {
+      p.printHelpOn(err);
+      return -1;
+    }
+
+    runJob(job);
+    return 0;
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeyComparator.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+
+/** The {@link RawComparator} used by jobs configured with {@link TetherJob}. */
+class TetherKeyComparator
+  extends Configured implements RawComparator<TetherData> {
+
+  private Schema schema;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf != null)
+      schema = AvroJob.getMapOutputSchema(conf);
+  }
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    int diff = BinaryData.compare(b1, BinaryData.skipLong(b1, s1),
+                                  b2, BinaryData.skipLong(b2, s2),
+                                  schema);
+    return diff == 0 ? -1 : diff;
+  }
+
+  @Override
+  public int compare(TetherData x, TetherData y) {
+    ByteBuffer b1 = x.buffer(), b2 = y.buffer();
+    int diff = BinaryData.compare(b1.array(), b1.position(), 
+                                  b2.array(), b2.position(),
+                                  schema);
+    return diff == 0 ? -1 : diff;
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherKeySerialization.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+/** A {@link Serialization} for {@link TetherData}. */
+class TetherKeySerialization
+  extends Configured implements Serialization<TetherData> {
+
+  public boolean accept(Class<?> c) {
+    return TetherData.class.isAssignableFrom(c);
+  }
+  
+  public Deserializer<TetherData> getDeserializer(Class<TetherData> c) {
+    return new TetherDataDeserializer();
+  }
+  
+  private static final DecoderFactory FACTORY = new DecoderFactory();
+  static { FACTORY.configureDirectDecoder(true); }
+
+  private class TetherDataDeserializer implements Deserializer<TetherData> {
+    private BinaryDecoder decoder;
+    
+    public void open(InputStream in) {
+      this.decoder = FACTORY.createBinaryDecoder(in, decoder);
+    }
+    
+    public TetherData deserialize(TetherData datum) throws IOException {
+      if (datum == null) datum = new TetherData();
+      datum.buffer(decoder.readBytes(datum.buffer()));
+      return datum;
+    }
+
+    public void close() throws IOException {
+      decoder.inputStream().close();
+    }
+  }
+  
+  public Serializer<TetherData> getSerializer(Class<TetherData> c) {
+    return new TetherDataSerializer();
+  }
+
+  private class TetherDataSerializer implements Serializer<TetherData> {
+
+    private OutputStream out;
+    private BinaryEncoder encoder;
+    
+    public void open(OutputStream out) {
+      this.out = out;
+      this.encoder = new BinaryEncoder(out);
+    }
+
+    public void serialize(TetherData datum) throws IOException {
+      encoder.writeBytes(datum.buffer());
+    }
+
+    public void close() throws IOException {
+      out.close();
+    }
+
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherMapRunner.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapRunner;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.mapred.AvroJob;
+
+class TetherMapRunner
+  extends MapRunner<TetherData, NullWritable, TetherData, NullWritable> {
+
+  static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+
+  private JobConf job;
+  private TetheredProcess process;
+
+  public void configure(JobConf job) {
+    this.job = job;
+  }
+
+  @SuppressWarnings("unchecked")
+  public void run(RecordReader<TetherData, NullWritable> recordReader,
+                  OutputCollector<TetherData, NullWritable> collector,
+                  Reporter reporter) throws IOException {
+    try {
+      // start tethered process
+      process = new TetheredProcess(job, collector, reporter);
+
+      // configure it
+      process.inputClient.configure
+        (TaskType.MAP, 
+         new Utf8(job.get(AvroJob.INPUT_SCHEMA)),
+         new Utf8(AvroJob.getMapOutputSchema(job).toString()));
+         
+      process.inputClient.partitions(job.getNumReduceTasks());
+
+      // run map
+      Counter inputRecordCounter =
+        reporter.getCounter("org.apache.hadoop.mapred.Task$Counter",
+                            "MAP_INPUT_RECORDS");
+      TetherData data = new TetherData();
+      while (recordReader.next(data, NullWritable.get())) {
+        process.inputClient.input(data.buffer(), data.count());
+        inputRecordCounter.increment(data.count()-1);
+        if (process.outputService.isFinished())
+          break;
+      }
+      process.inputClient.complete();
+
+      // wait for completion
+      if (process.outputService.waitForFinish())
+        throw new IOException("Task failed: "+process.outputService.error());
+
+    } catch (Throwable t) {                       // send abort
+      LOG.warn("Task failed", t);
+      process.inputClient.abort();
+      throw new IOException("Task failed: "+t, t);
+
+    } finally {                                   // clean up
+      if (process != null)
+        process.close();
+    }
+  }
+  
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputFormat.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroOutputFormat;
+
+/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+class TetherOutputFormat
+  extends FileOutputFormat<TetherData, NullWritable> {
+
+  /** Enable output compression using the deflate codec and specify its level.*/
+  public static void setDeflateLevel(JobConf job, int level) {
+    FileOutputFormat.setCompressOutput(job, true);
+    job.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, level);
+  }
+
+  @SuppressWarnings("unchecked")
+  public RecordWriter<TetherData, NullWritable>
+    getRecordWriter(FileSystem ignore, JobConf job,
+                    String name, Progressable prog)
+    throws IOException {
+
+    Schema schema = AvroJob.getOutputSchema(job);
+    
+    final DataFileWriter writer = new DataFileWriter(new GenericDatumWriter());
+
+    if (FileOutputFormat.getCompressOutput(job)) {
+      int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY,
+                             AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
+      writer.setCodec(CodecFactory.deflateCodec(level));
+    }
+
+    Path path =
+      FileOutputFormat.getTaskOutputPath(job, name+AvroOutputFormat.EXT);
+    writer.create(schema, path.getFileSystem(job).create(path));
+
+    return new RecordWriter<TetherData, NullWritable>() {
+        public void write(TetherData datum, NullWritable ignore)
+          throws IOException {
+          writer.appendEncoded(datum.buffer());
+        }
+        public void close(Reporter reporter) throws IOException {
+          writer.close();
+        }
+      };
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherOutputService.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.util.Utf8;
+
+class TetherOutputService implements OutputProtocol {
+  private Reporter reporter;
+  private OutputCollector<TetherData, NullWritable> collector;
+  private int inputPort;
+  private boolean complete;
+  private String error;
+
+  public TetherOutputService(OutputCollector<TetherData,NullWritable> collector,
+                             Reporter reporter) {
+    this.reporter = reporter;
+    this.collector = collector;
+  }
+
+  public synchronized void configure(int inputPort) {
+    TetherMapRunner.LOG.info("got input port from child");
+    this.inputPort = inputPort;
+    notify();
+  }
+
+  public synchronized int inputPort() throws InterruptedException {
+    while (inputPort == 0) {
+      TetherMapRunner.LOG.info("waiting for input port from child");
+      wait();
+    }
+    return inputPort;
+  }
+
+  public void output(ByteBuffer datum) {
+    try {
+      collector.collect(new TetherData(datum), NullWritable.get());
+    } catch (Throwable e) {
+      TetherMapRunner.LOG.warn("Error: "+e, e);
+      synchronized (this) {
+        error = e.toString();
+      }
+    }
+  }
+
+  public void outputPartitioned(int partition, ByteBuffer datum) {
+    TetherPartitioner.setNextPartition(partition);
+    output(datum);
+  }
+
+  public void status(Utf8 message) { reporter.setStatus(message.toString());  }
+
+
+  public void count(Utf8 group, Utf8 name, long amount) {
+    reporter.getCounter(group.toString(), name.toString()).increment(amount);
+  }
+
+  public synchronized void fail(Utf8 message) {
+    TetherMapRunner.LOG.warn("Failing: "+message);
+    error = message.toString();
+    notify();
+  }
+
+  public synchronized void complete() {
+    TetherMapRunner.LOG.info("got task complete");
+    complete = true;
+    notify();
+  }
+
+  public synchronized boolean isFinished() {
+    return complete || (error != null);
+  }
+
+  public String error() { return error; }
+
+  public synchronized boolean waitForFinish() throws InterruptedException {
+    while (!isFinished())
+      wait();
+    return error != null;
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherPartitoner.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+
+class TetherPartitioner implements Partitioner<TetherData, NullWritable> {
+  
+  private static final ThreadLocal<Integer> CACHE = new ThreadLocal<Integer>();
+
+  private Schema schema;
+
+  public void configure(JobConf job) {
+    schema = AvroJob.getMapOutputSchema(job);
+  }
+
+  static void setNextPartition(int newValue) {
+    CACHE.set(newValue);
+  }
+
+  public int getPartition(TetherData key, NullWritable value,
+                          int numPartitions) {
+    Integer result = CACHE.get();
+    if (result != null)                           // return cached value
+      return result;
+
+    ByteBuffer b = key.buffer();
+    int p = b.position();
+    int hashCode = BinaryData.hashCode(b.array(), p, b.limit()-p, schema);
+    if (hashCode < 0)
+      hashCode = -hashCode;
+    return hashCode % numPartitions;
+  }
+
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherRecordReader.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.FsInput;
+
+class TetherRecordReader
+  implements RecordReader<TetherData, NullWritable> {
+
+  private FsInput in;
+  private DataFileReader reader;
+  private long start;
+  private long end;
+
+  public TetherRecordReader(JobConf job, FileSplit split)
+    throws IOException {
+    this.in = new FsInput(split.getPath(), job);
+    this.reader =
+      new DataFileReader<Object>(in, new GenericDatumReader<Object>());
+
+    reader.sync(split.getStart());                    // sync to start
+    this.start = in.tell();
+    this.end = split.getStart() + split.getLength();
+
+    job.set(AvroJob.INPUT_SCHEMA, reader.getSchema().toString());
+  }
+
+  public Schema getSchema() { return reader.getSchema(); }
+
+  public TetherData createKey() { return new TetherData(); }
+  
+  public NullWritable createValue() { return NullWritable.get(); }
+    
+  public boolean next(TetherData data, NullWritable ignore)
+    throws IOException {
+    if (!reader.hasNext() || reader.pastSync(end))
+      return false;
+    data.buffer(reader.nextBlock());
+    data.count((int)reader.getBlockCount());
+    return true;
+  }
+  
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.tell() - start) / (float)(end - start));
+    }
+  }
+  
+  public long getPos() throws IOException {
+    return in.tell();
+  }
+
+  public void close() throws IOException { reader.close(); }
+  
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetherReducer.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.mapred.AvroJob;
+
+class TetherReducer
+  implements Reducer<TetherData,NullWritable,TetherData,NullWritable> {
+
+  private JobConf job;
+  private TetheredProcess process;
+  private boolean error;
+
+  public void configure(JobConf job) {
+    this.job = job;
+  }
+
+  public void reduce(TetherData datum, Iterator<NullWritable> ignore, 
+                     OutputCollector<TetherData, NullWritable> collector,
+                     Reporter reporter) throws IOException {
+    try {
+      if (process == null) {
+        process = new TetheredProcess(job, collector, reporter);
+        process.inputClient.configure
+          (TaskType.REDUCE,
+           new Utf8(AvroJob.getMapOutputSchema(job).toString()),
+           new Utf8(AvroJob.getOutputSchema(job).toString()));
+      }
+      process.inputClient.input(datum.buffer(), datum.count());
+    } catch (IOException e) {
+      error = true;
+      throw e;
+    } catch (Exception e) {
+      error = true;
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Handle the end of the input by closing down the application.
+   */
+  public void close() throws IOException {
+    if (process == null) return;
+    try {
+      if (error)
+        process.inputClient.abort();
+      else
+        process.inputClient.complete();
+      process.outputService.waitForFinish();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      process.close();
+    }
+  }
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/TetheredProcess.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+
+class TetheredProcess  {
+
+  private JobConf job;
+
+  TetherOutputService outputService;
+  Server outputServer;
+  Process subprocess;
+  Transceiver clientTransceiver;
+  InputProtocol inputClient;
+
+  public TetheredProcess(JobConf job,
+                          OutputCollector<TetherData, NullWritable> collector,
+                          Reporter reporter) throws Exception {
+    try {
+      // start server
+      this.outputService = new TetherOutputService(collector, reporter);
+      this.outputServer = new SocketServer
+        (new SpecificResponder(OutputProtocol.class, outputService),
+         new InetSocketAddress(0));
+      
+      // start sub-process, connecting back to server
+      this.subprocess = startSubprocess(job);
+      
+      // open client, connecting to sub-process
+      this.clientTransceiver =
+        new SocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+      this.inputClient = (InputProtocol)
+        SpecificRequestor.getClient(InputProtocol.class, clientTransceiver);
+
+
+    } catch (Exception t) {
+      close();
+      throw t;
+    }
+  }
+
+  public void close() {
+    if (clientTransceiver != null)
+      try {
+        clientTransceiver.close();
+      } catch (IOException e) {}                  // ignore
+    if (subprocess != null)
+      subprocess.destroy();
+    if (outputServer != null)
+      outputServer.close();
+  }
+
+  private Process startSubprocess(JobConf job)
+    throws IOException, InterruptedException {
+    // get the executable command
+    List<String> command = new ArrayList<String>();
+    Path[] localFiles = DistributedCache.getLocalCacheFiles(job);
+    if (localFiles == null) {                     // until MAPREDUCE-476
+      URI[] files = DistributedCache.getCacheFiles(job);
+      localFiles = new Path[] { new Path(files[0].toString()) };
+    }
+    String executable = localFiles[0].toString();
+    FileUtil.chmod(executable, "a+x");
+    command.add(executable);
+
+    if (System.getProperty("hadoop.log.dir") == null
+        && System.getenv("HADOOP_LOG_DIR") != null)
+      System.setProperty("hadoop.log.dir", System.getenv("HADOOP_LOG_DIR"));
+
+    // wrap the command in a stdout/stderr capture
+    TaskAttemptID taskid = TaskAttemptID.forName(job.get("mapred.task.id"));
+    File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+    File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    long logLength = TaskLog.getTaskLogLength(job);
+    command = TaskLog.captureOutAndError(command, stdout, stderr, logLength);
+    stdout.getParentFile().mkdirs();
+    stderr.getParentFile().mkdirs();
+
+    // add output server's port to env
+    Map<String, String> env = new HashMap<String,String>();
+    env.put("AVRO_TETHER_OUTPUT_PORT",
+            Integer.toString(outputServer.getPort()));
+
+    // start child process
+    ProcessBuilder builder = new ProcessBuilder(command);
+    builder.environment().putAll(env);
+    return builder.start();
+  }
+  
+}

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/tether/package.html Tue Jun 15 18:21:18 2010
@@ -0,0 +1,32 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Run <a href="http://hadoop.apache.org/">Hadoop</a> MapReduce jobs over
+Avro data, with map and reduce functions run in a sub-process.  This
+permits MapReduce programs over Avro data in languages besides Java.
+<p>
+Each language will provide a framework to permit easy implementation
+of MapReduce programs in that language.  Currently only a Java
+framework has been implemented, for test purposes, so this feature is
+not yet useful.
+<p>
+This is still an experimental API, subject to change.
+</body>
+</html>

Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificResponder.java Tue Jun 15 18:21:18 2010
@@ -89,6 +89,7 @@ public class SpecificResponder extends R
       for (Schema.Field param: message.getRequest().getFields())
         paramTypes[i++] = data.getClass(param.schema());
       Method method = impl.getClass().getMethod(message.getName(), paramTypes);
+      method.setAccessible(true);
       return method.invoke(impl, (Object[])request);
     } catch (InvocationTargetException e) {
       throw (Exception)e.getTargetException();

Modified: avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/tool/Main.java Tue Jun 15 18:21:18 2010
@@ -25,6 +25,7 @@ import java.io.InputStream;
 
 import org.apache.avro.reflect.InduceSchemaTool;
 import org.apache.avro.specific.SpecificCompiler.SpecificCompilerTool;
+import org.apache.avro.mapred.tether.TetherJob;
 
 /** Command-line driver.*/
 public class Main {
@@ -47,7 +48,8 @@ public class Main {
         new DataFileGetSchemaTool(),
         new GenAvroTool(),
         new RpcReceiveTool(),
-        new RpcSendTool()
+        new RpcSendTool(),
+        new TetherJob()
         }) {
       Tool prev = tools.put(tool.getName(), tool);
       if (prev != null) {

Modified: avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/util/Utf8.java Tue Jun 15 18:21:18 2010
@@ -80,9 +80,9 @@ public class Utf8 implements Comparable<
   }
 
   public int hashCode() {
-    int hash = length;
+    int hash = 0;
     for (int i = 0; i < this.length; i++)
-      hash += bytes[i] & 0xFF;
+      hash = hash*31 + bytes[i];
     return hash;
   }
 

Modified: avro/trunk/lang/java/src/test/bin/test_tools.sh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/bin/test_tools.sh?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/bin/test_tools.sh (original)
+++ avro/trunk/lang/java/src/test/bin/test_tools.sh Tue Jun 15 18:21:18 2010
@@ -80,6 +80,12 @@ $CMD tojson $TMPDIR/data_file_write.avro
 $CMD getschema $TMPDIR/data_file_write.avro \
   | cmp -s - <(echo '"string"')
 ######################################################################
+# Test tethered mapred
+$CMD tether --in build/test/mapred/in --out build/test/mapred/tout --outschema ../../share/test/schemas/WordCount.avsc --program build/test/wordcount.jar
+$CMD tojson build/test/mapred/tout/part-00000.avro \
+  | cmp -s - <($CMD tojson build/test/mapred/out/part-00000.avro)
+
+######################################################################
 
 $CMD 2>&1 | grep -q "Available tools:"
 $CMD doesnotexist 2>&1 | grep -q "Available tools:"

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestCompare.java Tue Jun 15 18:21:18 2010
@@ -54,8 +54,10 @@ public class TestCompare {
 
   @Test
   public void testString() throws Exception {
+    check("\"string\"", new Utf8(""), new Utf8("a"));
     check("\"string\"", new Utf8("a"), new Utf8("b"));
     check("\"string\"", new Utf8("a"), new Utf8("ab"));
+    check("\"string\"", new Utf8("ab"), new Utf8("b"));
   }
 
   @Test
@@ -219,6 +221,21 @@ public class TestCompare {
     assert(!o2.equals(null));
 
     assert(o1.hashCode() != o2.hashCode());
+
+    // check BinaryData.hashCode against Object.hashCode
+    if (schema.getType() != Schema.Type.ENUM) {
+      assertEquals(o1.hashCode(),
+                   BinaryData.hashCode(b1, 0, b1.length, schema));
+      assertEquals(o2.hashCode(),
+                   BinaryData.hashCode(b2, 0, b2.length, schema));
+    }
+
+    // check BinaryData.hashCode against GenericData.hashCode
+    assertEquals(comparator.hashCode(o1, schema),
+                 BinaryData.hashCode(b1, 0, b1.length, schema));
+    assertEquals(comparator.hashCode(o2, schema),
+                 BinaryData.hashCode(b2, 0, b2.length, schema));
+
   }
 
   @SuppressWarnings(value="unchecked")

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java?rev=954998&r1=954997&r2=954998&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountSpecific.java Tue Jun 15 18:21:18 2010
@@ -79,29 +79,25 @@ public class TestWordCountSpecific {
     String dir = System.getProperty("test.dir", ".") + "/mapred";
     Path outputPath = new Path(dir + "/out");
     
-    try {
-      WordCountUtil.writeLinesFile();
-  
-      job.setJobName("wordcount");
-   
-      AvroJob.setInputSpecific(job, Schema.create(Schema.Type.STRING));
-      AvroJob.setOutputSpecific(job, WordCount.SCHEMA$);
-  
-      job.setMapperClass(MapImpl.class);        
-      job.setCombinerClass(ReduceImpl.class);
-      job.setReducerClass(ReduceImpl.class);
-  
-      FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
-      FileOutputFormat.setOutputPath(job, outputPath);
-      FileOutputFormat.setCompressOutput(job, true);
-  
-      JobClient.runJob(job);
-  
-      WordCountUtil.validateCountsFile();
-    } finally {
-      outputPath.getFileSystem(job).delete(outputPath);
-    }
-
+    outputPath.getFileSystem(job).delete(outputPath);
+    WordCountUtil.writeLinesFile();
+    
+    job.setJobName("wordcount");
+    
+    AvroJob.setInputSpecific(job, Schema.create(Schema.Type.STRING));
+    AvroJob.setOutputSpecific(job, WordCount.SCHEMA$);
+    
+    job.setMapperClass(MapImpl.class);        
+    job.setCombinerClass(ReduceImpl.class);
+    job.setReducerClass(ReduceImpl.class);
+    
+    FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+    FileOutputFormat.setOutputPath(job, outputPath);
+    FileOutputFormat.setCompressOutput(job, true);
+    
+    JobClient.runJob(job);
+    
+    WordCountUtil.validateCountsFile();
   }
 
 }

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTask.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRequestor;
+
+/** Base class for Java tether mapreduce programs.  Useless except for testing,
+ * since it's already possible to write Java MapReduce programs without
+ * tethering.  Also serves as an example of how a framework may be
+ * implemented. */
+public abstract class TetherTask<IN,MID,OUT> {
+  static final Logger LOG = LoggerFactory.getLogger(TetherTask.class);
+
+  private Transceiver clientTransceiver;
+  private OutputProtocol outputClient;
+
+  private TaskType taskType;
+  private int partitions;
+
+  private DecoderFactory decoderFactory = DecoderFactory.defaultFactory();
+  private BinaryDecoder decoder;
+
+  private SpecificDatumReader<IN> inReader;
+  private SpecificDatumReader<MID> midReader;
+  private IN inRecord;
+  private MID midRecord;
+  private MID midRecordSpare;
+  private Collector<MID> midCollector;
+  private Collector<OUT> outCollector;
+
+  private static class Buffer extends ByteArrayOutputStream {
+    public ByteBuffer data() {
+      return ByteBuffer.wrap(buf, 0, count);
+    }
+  }
+
+  /** Collector for map and reduce output values. */
+  public class Collector<T> {
+    private SpecificDatumWriter<T> writer;
+    private Buffer buffer = new Buffer();
+    private BinaryEncoder encoder = new BinaryEncoder(buffer);
+    
+    private Collector(Schema schema) {
+      this.writer = new SpecificDatumWriter<T>(schema);
+    }
+
+    /** Collect a map or reduce output value. */
+    public void collect(T record) throws IOException {
+      buffer.reset();
+      writer.write(record, encoder);
+      outputClient.output(buffer.data());
+    }
+    
+    /** Collect a pre-partitioned map output value. */
+    public void collect(T record, int partition) throws IOException {
+      buffer.reset();
+      writer.write(record, encoder);
+      outputClient.outputPartitioned(partition, buffer.data());
+    }
+  }
+
+  void open(int inputPort) throws IOException {
+    // open output client, connecting to parent
+    String clientPortString = System.getenv("AVRO_TETHER_OUTPUT_PORT");
+    if (clientPortString == null)
+      throw new RuntimeException("AVRO_TETHER_OUTPUT_PORT env var is null");
+    int clientPort = Integer.parseInt(clientPortString);
+    this.clientTransceiver =
+      new SocketTransceiver(new InetSocketAddress(clientPort));
+    this.outputClient = (OutputProtocol)
+      SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+
+    // send inputPort to parent
+    outputClient.configure(inputPort);
+  }
+
+  void configure(TaskType taskType, Utf8 inSchemaText, Utf8 outSchemaText) {
+    this.taskType = taskType;
+    try {
+      Schema inSchema = Schema.parse(inSchemaText.toString());
+      Schema outSchema = Schema.parse(outSchemaText.toString());
+      switch (taskType) {
+      case MAP:
+        this.inReader = new SpecificDatumReader<IN>(inSchema);
+        this.midCollector = new Collector<MID>(outSchema);
+        break;
+      case REDUCE:
+        this.midReader = new SpecificDatumReader<MID>(inSchema);
+        this.outCollector = new Collector<OUT>(outSchema);
+        break;
+      }
+    } catch (Throwable e) {
+      fail(e.toString());
+    }
+  }
+
+  void partitions(int partitions) { this.partitions = partitions; }
+
+  /** Return the number of map output partitions of this job. */
+  public int partitions() { return partitions; }
+
+  void input(ByteBuffer data, long count) {
+    try {
+      decoder = decoderFactory.createBinaryDecoder(data.array(), decoder);
+      for (long i = 0; i < count; i++) {
+        switch (taskType) {
+        case MAP:
+          inRecord = inReader.read(inRecord, decoder);
+          map(inRecord, midCollector);
+          break;
+        case REDUCE:
+          MID prev = midRecord;
+          midRecord = midReader.read(midRecordSpare, decoder);
+          if (prev != null && !midRecord.equals(prev))
+            reduceFlush(prev, outCollector);
+          reduce(midRecord, outCollector);
+          midRecordSpare = prev;
+          break;
+        }
+      }
+    } catch (Throwable e) {
+      LOG.warn("failing: "+e, e);
+      fail(e.toString());
+    }
+  }
+
+  void complete() {
+    if (taskType == TaskType.REDUCE && midRecord != null)
+      try {
+        reduceFlush(midRecord, outCollector);
+      } catch (Throwable e) {
+        LOG.warn("failing: "+e, e);
+        fail(e.toString());
+      }
+    outputClient.complete();
+  }
+
+  /** Called with input values to generate intermediate values. */
+  public abstract void map(IN record, Collector<MID> collector)
+    throws IOException;
+  /** Called with sorted intermediate values. */
+  public abstract void reduce(MID record, Collector<OUT> collector)
+    throws IOException;
+  /** Called with the last intermediate value in each equivalence run. */
+  public abstract void reduceFlush(MID record, Collector<OUT> collector)
+    throws IOException;
+
+  /** Call to update task status. */
+  public void status(String message) {
+    outputClient.status(new Utf8(message));
+  }
+
+  /** Call to increment a counter. */
+  public void count(String group, String name, long amount) {
+    outputClient.count(new Utf8(group), new Utf8(name), amount);
+  }
+
+  /** Call to fail the task. */
+  public void fail(String message) {
+    outputClient.fail(new Utf8(message));
+    close();
+  }
+
+  void close() {
+    if (clientTransceiver != null)
+      try {
+        clientTransceiver.close();
+      } catch (IOException e) {}                  // ignore
+  }
+
+}

Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java?rev=954998&view=auto
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java (added)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java Tue Jun 15 18:21:18 2010
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.specific.SpecificResponder;
+
+/** Java implementation of a tether executable.  Useless except for testing,
+ * since it's already possible to write Java MapReduce programs without
+ * tethering.  Also serves as an example of how a framework may be
+ * implemented. */
+public class TetherTaskRunner implements InputProtocol {
+  static final Logger LOG = LoggerFactory.getLogger(TetherTaskRunner.class);
+
+  private SocketServer inputServer;
+  private TetherTask task;
+
+  public TetherTaskRunner(TetherTask task) throws IOException {
+    this.task = task;
+
+    // start input server
+    this.inputServer = new SocketServer
+      (new SpecificResponder(InputProtocol.class, this),
+       new InetSocketAddress(0));
+
+    // open output to parent
+    task.open(inputServer.getPort());
+  }
+
+  @Override public void configure(TaskType taskType,
+                                  Utf8 inSchema,
+                                  Utf8 outSchema) {
+    LOG.info("got configure");
+    task.configure(taskType, inSchema, outSchema);
+  }
+
+  @Override public synchronized void input(ByteBuffer data, long count) {
+    task.input(data, count);
+  }
+
+  @Override public void partitions(int partitions) {
+    task.partitions(partitions);
+  }
+
+  @Override public void abort() {
+    LOG.info("got abort");
+    close();
+  }
+
+  @Override public synchronized void complete() {
+    LOG.info("got input complete");
+    task.complete();
+    close();
+  }
+
+  /** Wait for task to complete. */
+  public void join() throws InterruptedException {
+    inputServer.join();
+  }
+
+  private void close() {
+    task.close();
+    if (inputServer != null)
+      inputServer.close();
+  }
+}