You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/10/07 16:02:18 UTC

svn commit: r1629897 [1/2] - in /avro/trunk: ./ lang/java/ lang/java/mapred/ lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/ lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/ lang/java/tools/ lang/java/tools/src/main/java/org/...

Author: cutting
Date: Tue Oct  7 14:02:17 2014
New Revision: 1629897

URL: http://svn.apache.org/r1629897
Log:
AVRO-570. Python: Add connector for tethered mapreduce. Contributed by Jeremy Lewi and Steven Willis.

Added:
    avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java   (with props)
    avro/trunk/lang/py/src/avro/tether/
    avro/trunk/lang/py/src/avro/tether/__init__.py   (with props)
    avro/trunk/lang/py/src/avro/tether/tether_task.py   (with props)
    avro/trunk/lang/py/src/avro/tether/tether_task_runner.py   (with props)
    avro/trunk/lang/py/src/avro/tether/util.py   (with props)
    avro/trunk/lang/py/test/mock_tether_parent.py   (with props)
    avro/trunk/lang/py/test/set_avro_test_path.py   (with props)
    avro/trunk/lang/py/test/test_tether_task.py   (with props)
    avro/trunk/lang/py/test/test_tether_task_runner.py   (with props)
    avro/trunk/lang/py/test/test_tether_word_count.py   (with props)
    avro/trunk/lang/py/test/word_count_task.py   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/pom.xml
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
    avro/trunk/lang/java/pom.xml
    avro/trunk/lang/java/tools/pom.xml
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java
    avro/trunk/lang/py/   (props changed)
    avro/trunk/lang/py/build.xml
    avro/trunk/lang/py/test/test_datafile.py
    avro/trunk/lang/py/test/test_datafile_interop.py
    avro/trunk/lang/py/test/test_io.py
    avro/trunk/lang/py/test/test_ipc.py
    avro/trunk/lang/py/test/test_schema.py

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Oct  7 14:02:17 2014
@@ -23,6 +23,9 @@ Trunk (not yet released)
 
     AVRO-1502. Java: Generated classes now implement Serializable. (cutting)
 
+    AVRO-570. Python: Add connector for tethered mapreduce.
+    (Jeremy Lewi and Steven Willis via cutting)    
+
   OPTIMIZATIONS
 
   IMPROVEMENTS

Modified: avro/trunk/lang/java/mapred/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/pom.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/pom.xml (original)
+++ avro/trunk/lang/java/mapred/pom.xml Tue Oct  7 14:02:17 2014
@@ -140,6 +140,11 @@
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-mapper-asl</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>${commons-codec.version}</version>
+    </dependency>
   </dependencies>
   
   <profiles>

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java Tue Oct  7 14:02:17 2014
@@ -41,6 +41,7 @@ public class TetherJob extends Configure
   public static final String TETHER_EXEC="avro.tether.executable";
   public static final String TETHER_EXEC_ARGS="avro.tether.executable_args";
   public static final String TETHER_EXEC_CACHED="avro.tether.executable_cached";
+  public static final String TETHER_PROTOCOL="avro.tether.protocol";
   
   /** Get the URI of the application's executable. */
   public static URI getExecutable(JobConf job) {
@@ -61,22 +62,44 @@ public class TetherJob extends Configure
    * and provides the mapper/reducer). 
    * @param job - Job
    * @param executable - The URI of the executable
-   * @param args - A string of additional arguments
+   * @param args - List of additional arguments; Null if no arguments
    * @param cached - If true, the executable URI is cached using DistributedCache
    *               - if false its not cached. I.e if the file is already stored on each local file system
    *                or if its on a NFS share
    */
   public static void setExecutable(JobConf job, File executable, List<String> args, boolean cached) {
         job.set(TETHER_EXEC, executable.toString());
-        StringBuilder sb = new StringBuilder();
-        for (String a : args) {
-          sb.append(a);
-          sb.append('\n');
+        if (args != null){
+          StringBuilder sb = new StringBuilder();
+          for (String a : args) {
+            sb.append(a);
+            sb.append('\n');
+          }
+          job.set(TETHER_EXEC_ARGS, sb.toString());
         }
-        job.set(TETHER_EXEC_ARGS, sb.toString());
         job.set(TETHER_EXEC_CACHED,  (new Boolean(cached)).toString());
   }
 
+  /**
+   * Extract from the job configuration file an instance of the TRANSPROTO enumeration
+   * to represent the protocol to use for the communication
+   * @param job
+   * @return
+   */
+  public static TetheredProcess.Protocol getProtocol(JobConf job) {
+
+    if (job.get(TetherJob.TETHER_PROTOCOL)==null) {
+      return TetheredProcess.Protocol.NONE;
+    } else if (job.get(TetherJob.TETHER_PROTOCOL).equals("http")) {
+      return TetheredProcess.Protocol.HTTP;
+    } else if (job.get(TetherJob.TETHER_PROTOCOL).equals("sasl")) {
+      return TetheredProcess.Protocol.SASL;
+    } else {
+      throw new RuntimeException("Unknown value for protocol: " +job.get(TetherJob.TETHER_PROTOCOL));
+    }
+
+  }
+
   /** Submit a job to the map/reduce cluster. All of the necessary
    * modifications to the job to run under tether are made to the
    * configuration.
@@ -92,6 +115,24 @@ public class TetherJob extends Configure
     return new JobClient(conf).submitJob(conf);
   }
   
+  /**
+   * Determines which transport protocol (e.g http or sasl) used to communicate
+   * between the parent and subprocess
+   *
+   * @param job - job configuration
+   * @param proto - String identifying the protocol currently http or sasl
+   */
+  public static void setProtocol(JobConf job, String proto) throws IOException {
+    proto=proto.trim().toLowerCase();
+
+    if (!(proto.equals("http") || proto.equals("sasl"))) {
+      throw new IOException("protocol must be 'http' or 'sasl'");
+    }
+
+    job.set(TETHER_PROTOCOL,proto);
+
+  }
+
   private static void setupTetherJob(JobConf job) throws IOException {
     job.setMapRunnerClass(TetherMapRunner.class);
     job.setPartitionerClass(TetherPartitioner.class);
@@ -107,6 +148,11 @@ public class TetherJob extends Configure
     // set the map output key class to TetherData
     job.setMapOutputKeyClass(TetherData.class);
     
+    // if protocol isn't set
+    if (job.getStrings(TETHER_PROTOCOL)==null) {
+      job.set(TETHER_PROTOCOL, "sasl");
+    }
+
     // add TetherKeySerialization to io.serializations
     Collection<String> serializations =
       job.getStringCollection("io.serializations");

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java Tue Oct  7 14:02:17 2014
@@ -80,6 +80,7 @@ class TetherKeySerialization
 
     public void serialize(TetherData datum) throws IOException {
       encoder.writeBytes(datum.buffer());
+      encoder.flush(); //Flush shouldn't be required. Might be a bug in AVRO.
     }
 
     public void close() throws IOException {

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java Tue Oct  7 14:02:17 2014
@@ -36,7 +36,7 @@ import org.apache.avro.mapred.AvroJob;
 class TetherMapRunner
   extends MapRunner<TetherData, NullWritable, TetherData, NullWritable> {
 
-  static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
 
   private JobConf job;
   private TetheredProcess process;
@@ -54,11 +54,13 @@ class TetherMapRunner
       process = new TetheredProcess(job, collector, reporter);
 
       // configure it
+      LOG.info("send configure to subprocess for map task");
       process.inputClient.configure
         (TaskType.MAP, 
          job.get(AvroJob.INPUT_SCHEMA),
          AvroJob.getMapOutputSchema(job).toString());
          
+      LOG.info("send partitions to subprocess for map task");
       process.inputClient.partitions(job.getNumReduceTasks());
 
       // run map
@@ -72,6 +74,7 @@ class TetherMapRunner
         if (process.outputService.isFinished())
           break;
       }
+      LOG.info("send complete to subprocess for map task");
       process.inputClient.complete();
 
       // wait for completion

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java Tue Oct  7 14:02:17 2014
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class TetherOutputService implements OutputProtocol {
   private Reporter reporter;
@@ -31,6 +33,11 @@ class TetherOutputService implements Out
   private boolean complete;
   private String error;
 
+  private static final Logger LOG = LoggerFactory.getLogger(TetherOutputService.class);
+
+  // timeout when waiting for messages in seconds.
+  // what is a good value?
+  public static final long TIMEOUT=10*1000;
   public TetherOutputService(OutputCollector<TetherData,NullWritable> collector,
                              Reporter reporter) {
     this.reporter = reporter;
@@ -38,15 +45,20 @@ class TetherOutputService implements Out
   }
 
   public synchronized void configure(int inputPort) {
-    TetherMapRunner.LOG.info("got input port from child");
+    LOG.info("got input port from child: inputport="+inputPort);
     this.inputPort = inputPort;
     notify();
   }
 
-  public synchronized int inputPort() throws InterruptedException {
-    while (inputPort == 0) {
-      TetherMapRunner.LOG.info("waiting for input port from child");
-      wait();
+  public synchronized int inputPort() throws Exception {
+    if (inputPort==0) {
+      LOG.info("waiting for input port from child");
+      wait(TIMEOUT);
+    }
+
+    if (inputPort==0) {
+      LOG.error("Parent process timed out waiting for subprocess to send input port. Check the job log files for more info.");
+      throw new Exception("Parent process timed out waiting for subprocess to send input port");
     }
     return inputPort;
   }
@@ -55,7 +67,7 @@ class TetherOutputService implements Out
     try {
       collector.collect(new TetherData(datum), NullWritable.get());
     } catch (Throwable e) {
-      TetherMapRunner.LOG.warn("Error: "+e, e);
+      LOG.warn("Error: "+e, e);
       synchronized (this) {
         error = e.toString();
       }
@@ -75,13 +87,13 @@ class TetherOutputService implements Out
   }
 
   public synchronized void fail(String message) {
-    TetherMapRunner.LOG.warn("Failing: "+message);
+    LOG.warn("Failing: "+message);
     error = message.toString();
     notify();
   }
 
   public synchronized void complete() {
-    TetherMapRunner.LOG.info("got task complete");
+    LOG.info("got task complete");
     complete = true;
     notify();
   }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java Tue Oct  7 14:02:17 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -43,13 +44,15 @@ import org.apache.avro.ipc.SaslSocketSer
 import org.apache.avro.ipc.SaslSocketTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class TetheredProcess  {
 
-  static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+  static final Logger LOG = LoggerFactory.getLogger(TetheredProcess.class);
 
   private JobConf job;
 
@@ -59,15 +62,44 @@ class TetheredProcess  {
   Transceiver clientTransceiver;
   InputProtocol inputClient;
 
+  /**
+   * Enumeration defines which transport protocol to use to communicate between
+   * the map/reduce java daemons and the tethered proce
+   */
+  public enum Protocol {HTTP,SASL,NONE};
+
+  //which protocol we are using
+  Protocol proto;
+
   public TetheredProcess(JobConf job,
                           OutputCollector<TetherData, NullWritable> collector,
                           Reporter reporter) throws Exception {
     try {
       // start server
       this.outputService = new TetherOutputService(collector, reporter);
-      this.outputServer = new SaslSocketServer
-        (new SpecificResponder(OutputProtocol.class, outputService),
-         new InetSocketAddress(0));
+
+      proto=TetherJob.getProtocol(job);
+
+      InetSocketAddress iaddress;
+      switch (proto) {
+      case SASL:
+        iaddress=new InetSocketAddress(0);
+        this.outputServer = new SaslSocketServer
+            (new SpecificResponder(OutputProtocol.class, outputService),
+                iaddress);
+        break;
+      case HTTP:
+        iaddress=new InetSocketAddress(0);
+        //set it up for http
+        this.outputServer= new  HttpServer
+            (new SpecificResponder(OutputProtocol.class, outputService),
+                iaddress.getPort());
+        break;
+      case NONE:
+      default:
+        throw new RuntimeException("No transport protocol was specified in the job configuraiton");
+      }
+
       outputServer.start();
       
       // start sub-process, connecting back to server
@@ -85,8 +117,18 @@ class TetheredProcess  {
         LOG.error("Could not start subprocess");
         throw new RuntimeException("Could not start subprocess");
       }
-      this.clientTransceiver
-        = new SaslSocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+      // open client, connecting to sub-process
+      switch (proto) {
+      case SASL:
+        this.clientTransceiver =new SaslSocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+        break;
+      case HTTP:
+        this.clientTransceiver =new HttpTransceiver(new URL("http://127.0.0.1:"+outputService.inputPort()));
+        break;
+      default:
+        throw new RuntimeException("Error: code to handle this protocol is not implemented");
+      }
+
       this.inputClient =
         SpecificRequestor.getClient(InputProtocol.class, clientTransceiver);
 
@@ -131,17 +173,21 @@ class TetheredProcess  {
     command.add(executable);
 
     // Add the executable arguments. We assume the arguments are separated by
-    // spaces so we split the argument string based on spaces and add each
+    // newlines so we split the argument string based on newlines and add each
     // token to command We need to do it this way because
     // TaskLog.captureOutAndError will put quote marks around each argument so
     // if we pass a single string containing all arguments we get quoted
     // incorrectly
     String args=job.get(TetherJob.TETHER_EXEC_ARGS);
-    String[] aparams=args.split("\n");
-    for (int i=0;i<aparams.length; i++){            
-      aparams[i]=aparams[i].trim();
-      if (aparams[i].length()>0){
-        command.add(aparams[i]);
+
+    // args might be null if TETHER_EXEC_ARGS wasn't set.
+    if (args != null) {
+      String[] aparams=args.split("\n");
+      for (int i=0;i<aparams.length; i++){
+        aparams[i]=aparams[i].trim();
+        if (aparams[i].length()>0){
+          command.add(aparams[i]);
+        }
       }
     }
 
@@ -163,6 +209,18 @@ class TetheredProcess  {
     env.put("AVRO_TETHER_OUTPUT_PORT",
             Integer.toString(outputServer.getPort()));
 
+    // add an environment variable to specify what protocol to use for communication
+    env.put("AVRO_TETHER_PROTOCOL", job.get(TetherJob.TETHER_PROTOCOL));
+
+    // print an info message about the command
+    String imsg="";
+    for (int i=0; i<command.size();i++) {
+      imsg=command.get(i)+" ";
+    }
+    LOG.info("TetheredProcess.startSubprocess: command: "+imsg);
+    LOG.info("Tetheredprocess.startSubprocess: stdout logged to: " + stdout.toString()) ;
+    LOG.info("Tetheredprocess.startSubprocess: stderr logged to: " + stderr.toString()) ;
+
     // start child process
     ProcessBuilder builder = new ProcessBuilder(command);
     System.out.println(command);

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java Tue Oct  7 14:02:17 2014
@@ -43,13 +43,19 @@ import org.apache.avro.Schema;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.specific.SpecificDatumReader;
 
+/**
+ * See also TestTetherTool for an example of how to submit jobs using the thether tool.
+ *
+ */
 public class TestWordCountTether {
 
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testJob() throws Exception {
-
+  /**
+   * Run a job using the given transport protocol
+   * @param proto
+   */
+  private void _runjob(String proto)throws Exception {
+    // System.out.println(System.getProperty("java.class.path").replace(":", "\n"));
     System.out.println(System.getProperty("java.class.path"));
     JobConf job = new JobConf();
     String dir = System.getProperty("test.dir", ".") + "/mapred";
@@ -80,6 +86,7 @@ public class TestWordCountTether {
     AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
     job.set(AvroJob.OUTPUT_SCHEMA, outscheme.toString());
 
+    TetherJob.setProtocol(job, proto);
     TetherJob.runJob(job);
 
     // validate the output
@@ -100,5 +107,23 @@ public class TestWordCountTether {
 
   }
 
+  /**
+   * Test the job using the sasl protocol
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testJob() throws Exception {
+      _runjob("sasl");
+  }
 
+  /**
+   * Test the job using the http protocol
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testhtp() throws Exception {
+    _runjob("http");
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java Tue Oct  7 14:02:17 2014
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.net.InetSocketAddress;
+import java.net.URL;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.avro.Schema;
+import org.apache.avro.ipc.HttpTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.SaslSocketTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -61,6 +63,8 @@ public abstract class TetherTask<IN,MID,
   private Collector<MID> midCollector;
   private Collector<OUT> outCollector;
 
+  private TetheredProcess.Protocol proto;
+
   private static class Buffer extends ByteArrayOutputStream {
     public ByteBuffer data() {
       return ByteBuffer.wrap(buf, 0, count);
@@ -98,12 +102,37 @@ public abstract class TetherTask<IN,MID,
   void open(int inputPort) throws IOException {
     // open output client, connecting to parent
     String clientPortString = System.getenv("AVRO_TETHER_OUTPUT_PORT");
+    String protocol = System.getenv("AVRO_TETHER_PROTOCOL");
     if (clientPortString == null)
       throw new RuntimeException("AVRO_TETHER_OUTPUT_PORT env var is null");
     int clientPort = Integer.parseInt(clientPortString);
-    this.clientTransceiver =
+
+    if (protocol == null) {
+      throw new RuntimeException("AVRO_TETHER_PROTOCOL env var is null");
+    }
+
+    protocol=protocol.trim().toLowerCase();
+
+    if (protocol.equals("http")) {
+      proto=TetheredProcess.Protocol.HTTP;
+    } else if (protocol.equals("sasl")) {
+      proto=TetheredProcess.Protocol.SASL;
+    } else {
+      throw new RuntimeException("AVROT_TETHER_PROTOCOL="+protocol+" but this protocol is unsupported");
+    }
+
+    switch (proto) {
+    case SASL:
+      this.clientTransceiver =
       new SaslSocketTransceiver(new InetSocketAddress(clientPort));
-    this.outputClient = SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+      this.outputClient = SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+      break;
+
+    case HTTP:
+      this.clientTransceiver =new HttpTransceiver(new URL("http://127.0.0.1:"+clientPort));
+      this.outputClient = SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+      break;
+    }
 
     // send inputPort to parent
     outputClient.configure(inputPort);
@@ -167,7 +196,9 @@ public abstract class TetherTask<IN,MID,
         LOG.warn("failing: "+e, e);
         fail(e.toString());
       }
+    LOG.info("TetherTask: Sending complete to parent process.");
     outputClient.complete();
+    LOG.info("TetherTask: Done sending complete to parent process.");
   }
 
   /** Called with input values to generate intermediate values. */
@@ -197,6 +228,7 @@ public abstract class TetherTask<IN,MID,
   }
 
   void close() {
+    LOG.info("Closing the transciever");
     if (clientTransceiver != null)
       try {
         clientTransceiver.close();

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java Tue Oct  7 14:02:17 2014
@@ -20,13 +20,19 @@ package org.apache.avro.mapred.tether;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
 import org.apache.avro.ipc.SaslSocketServer;
+import org.apache.avro.ipc.SaslSocketTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.ipc.Server;
 
 /** Java implementation of a tether executable.  Useless except for testing,
  * since it's already possible to write Java MapReduce programs without
@@ -35,16 +41,52 @@ import org.apache.avro.ipc.specific.Spec
 public class TetherTaskRunner implements InputProtocol {
   static final Logger LOG = LoggerFactory.getLogger(TetherTaskRunner.class);
 
-  private SaslSocketServer inputServer;
+  private Server inputServer;
   private TetherTask task;
 
+  private TetheredProcess.Protocol proto;
+
   public TetherTaskRunner(TetherTask task) throws IOException {
     this.task = task;
 
-    // start input server
-    this.inputServer = new SaslSocketServer
+    //determine what protocol we are using
+    String protocol = System.getenv("AVRO_TETHER_PROTOCOL");
+    if (protocol == null) {
+      throw new RuntimeException("AVRO_TETHER_PROTOCOL env var is null");
+    }
+
+    protocol=protocol.trim().toLowerCase();
+
+    if (protocol.equals("http")) {
+      LOG.info("Use HTTP protocol");
+      proto=TetheredProcess.Protocol.HTTP;
+    } else if (protocol.equals("sasl")) {
+      LOG.info("Use SASL protocol");
+      proto=TetheredProcess.Protocol.SASL;
+    } else {
+      throw new RuntimeException("AVRO_TETHER_PROTOCOL="+protocol+" but this protocol is unsupported");
+    }
+
+    InetSocketAddress iaddress=new InetSocketAddress(0);
+
+    switch(proto) {
+    case SASL:
+      // start input server
+      this.inputServer = new SaslSocketServer
+      (new SpecificResponder(InputProtocol.class, this),
+          iaddress);
+      LOG.info("Started SaslSocketServer on port:"+iaddress.getPort());
+      break;
+
+    case HTTP:
+      this.inputServer=new  HttpServer
       (new SpecificResponder(InputProtocol.class, this),
-       new InetSocketAddress(0));
+          iaddress.getPort());
+
+      LOG.info("Started HttpServer on port:"+iaddress.getPort());
+      break;
+    }
+
     inputServer.start();
 
     // open output to parent
@@ -74,16 +116,19 @@ public class TetherTaskRunner implements
   @Override public synchronized void complete() {
     LOG.info("got input complete");
     task.complete();
-    close();
   }
 
   /** Wait for task to complete. */
   public void join() throws InterruptedException {
+    LOG.info("TetherTaskRunner: Start join.");
     inputServer.join();
+    LOG.info("TetherTaskRunner: Finish join.");
   }
 
   private void close() {
+    LOG.info("Closing the task");
     task.close();
+    LOG.info("Finished closing the task.");
     if (inputServer != null)
       inputServer.close();
   }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java Tue Oct  7 14:02:17 2014
@@ -24,11 +24,15 @@ import java.util.StringTokenizer;
 import org.apache.avro.mapred.Pair;
 import org.apache.avro.util.Utf8;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /** Example Java tethered mapreduce executable.  Implements map and reduce
  * functions for word count. */
 public class WordCountTask
   extends TetherTask<Utf8,Pair<Utf8,Long>,Pair<Utf8,Long>> {
   
+  static final Logger LOG = LoggerFactory.getLogger(WordCountTask.class);
   @Override public void map(Utf8 text, Collector<Pair<Utf8,Long>> collector)
     throws IOException {
     StringTokenizer tokens = new StringTokenizer(text.toString());
@@ -52,6 +56,7 @@ public class WordCountTask
 
   public static void main(String[] args) throws Exception {
     new TetherTaskRunner(new WordCountTask()).join();
+    LOG.info("WordCountTask finished");
   }
 
 }

Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Tue Oct  7 14:02:17 2014
@@ -58,8 +58,12 @@
     <velocity.version>1.7</velocity.version>
     <maven.version>2.0.10</maven.version>
     <ant.version>1.9.0</ant.version>
-    <commons-lang.version>2.6</commons-lang.version>
+    <commons-cli.version>1.2</commons-cli.version>
+    <commons-codec.version>1.9</commons-codec.version>
     <commons-compress.version>1.8.1</commons-compress.version>
+    <commons-httpclient.version>3.1</commons-httpclient.version>
+    <commons-lang.version>2.6</commons-lang.version>
+    <commons-logging.version>1.1.1</commons-logging.version>
     <tukaani.version>1.5</tukaani.version>
     <easymock.version>3.2</easymock.version>
     <hamcrest.version>1.3</hamcrest.version>
@@ -167,6 +171,18 @@
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-jar-plugin</artifactId>
           <version>${jar-plugin.version}</version>
+
+          <!--We want to be able to resuse the test-jars for mapred
+              to test avro-tool
+              see http://maven.apache.org/guides/mini/guide-attached-tests.html
+          -->
+          <executions>
+            <execution>
+              <goals>
+                <goal>test-jar</goal>
+              </goals>
+            </execution>
+          </executions>
           <configuration>
             <archive>
               <manifest>

Modified: avro/trunk/lang/java/tools/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/pom.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/pom.xml (original)
+++ avro/trunk/lang/java/tools/pom.xml Tue Oct  7 14:02:17 2014
@@ -117,6 +117,31 @@
       <artifactId>avro-mapred</artifactId>
       <version>${project.version}</version>
     </dependency>
+
+    <!--For testing TetherTool we need the mapred test jar
+	because that contains the word count example.-->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <version>${commons-cli.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>${commons-logging.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <version>${commons-httpclient.version}</version>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>trevni-core</artifactId>

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java Tue Oct  7 14:02:17 2014
@@ -20,12 +20,9 @@ package org.apache.avro.tool;
 import java.io.File;
 import java.io.InputStream;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.List;
 
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-
 import org.apache.avro.Schema;
 import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.mapred.tether.TetherJob;
@@ -34,6 +31,15 @@ import org.apache.hadoop.mapred.FileInpu
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+
+
 @SuppressWarnings("deprecation")
 public class TetherTool implements Tool {
   public TetherJob job;
@@ -52,38 +58,116 @@ public class TetherTool implements Tool 
   public int run(InputStream ins, PrintStream outs, PrintStream err,
       List<String> args) throws Exception {
 
-    OptionParser p = new OptionParser();
-    OptionSpec<File> exec = p
-        .accepts("program", "executable program, usually in HDFS")
-        .withRequiredArg().ofType(File.class);
-    OptionSpec<String> in = p.accepts("in", "comma-separated input paths")
-        .withRequiredArg().ofType(String.class);
-    OptionSpec<Path> out = p.accepts("out", "output directory")
-        .withRequiredArg().ofType(Path.class);
-    OptionSpec<File> outSchema = p.accepts("outschema", "output schema file")
-        .withRequiredArg().ofType(File.class);
-    OptionSpec<File> mapOutSchema = p
-        .accepts("outschemamap", "map output schema file, if different")
-        .withOptionalArg().ofType(File.class);
-    OptionSpec<Integer> reduces = p.accepts("reduces", "number of reduces")
-        .withOptionalArg().ofType(Integer.class);
+    String[] argarry = args.toArray(new String[0]);
+    Options opts = new Options();
+
+    Option helpopt = OptionBuilder.hasArg(false)
+        .withDescription("print this message")
+        .create("help");
+
+    Option inopt = OptionBuilder.hasArg()
+        .isRequired()
+        .withDescription("comma-separated input paths")
+        .create("in");
+
+    Option outopt = OptionBuilder.hasArg()
+        .isRequired()
+        .withDescription("The output path.")
+        .create("out");
+
+    Option pargs = OptionBuilder.hasArg()
+        .withDescription("A string containing the command line arguments to pass to the tethered process. String should be enclosed in quotes")
+        .create("exec_args");
+
+    Option popt = OptionBuilder.hasArg()
+        .isRequired()
+        .withDescription("executable program, usually in HDFS")
+        .create("program");
+
+    Option outscopt = OptionBuilder.withType(File.class).hasArg()
+        .isRequired()
+        .withDescription("schema file for output of reducer")
+        .create("outschema");
+
+    Option outscmapopt = OptionBuilder.withType(File.class).hasArg()
+        .withDescription("(optional) map output schema file,  if different from outschema")
+        .create("outschemamap");
+
+    Option redopt = OptionBuilder.withType(Integer.class).hasArg()
+        .withDescription("(optional) number of reducers")
+        .create("reduces");
+
+    Option cacheopt = OptionBuilder.withType(Boolean.class).hasArg()
+        .withDescription("(optional) boolean indicating whether or not the exectuable should be distributed via distributed cache")
+        .create("exec_cached");
+
+    Option protoopt = OptionBuilder.hasArg()
+        .withDescription("(optional) specifies the transport protocol 'http' or 'sasl'")
+        .create("protocol");
+
+    opts.addOption(redopt);
+    opts.addOption(outscopt);
+    opts.addOption(popt);
+    opts.addOption(pargs);
+    opts.addOption(inopt);
+    opts.addOption(outopt);
+    opts.addOption(helpopt);
+    opts.addOption(outscmapopt);
+    opts.addOption(cacheopt);
+    opts.addOption(protoopt);
+
+    CommandLineParser parser = new GnuParser();
+
+    String[] genargs = null;
+    CommandLine line = null;
+    HelpFormatter formatter = new HelpFormatter();
 
     JobConf job = new JobConf();
 
     try {
-      OptionSet opts = p.parse(args.toArray(new String[0]));
-      FileInputFormat.addInputPaths(job, in.value(opts));
-      FileOutputFormat.setOutputPath(job, out.value(opts));
-      TetherJob.setExecutable(job, exec.value(opts));
-      job.set(AvroJob.OUTPUT_SCHEMA, Schema.parse(outSchema.value(opts))
-          .toString());
-      if (opts.hasArgument(mapOutSchema))
+      line = parser.parse(opts, argarry);
+
+      if (line.hasOption("help")) {
+        formatter.printHelp("tether", opts );
+        return 0;
+      }
+
+      genargs = line.getArgs();
+
+      FileInputFormat.addInputPaths(job, line.getOptionValue("in"));
+      FileOutputFormat.setOutputPath(job,new Path (line.getOptionValue("out")));
+
+      List<String> exargs = null;
+      Boolean cached = false;
+
+      if (line.hasOption("exec_args")) {
+        String[] splitargs = line.getOptionValue("exec_args").split(" ");
+        exargs = new ArrayList<String>();
+        for (String item: splitargs){
+          exargs.add(item);
+        }
+      }
+      if (line.hasOption("exec_cached")) {
+        cached = Boolean.parseBoolean(line.getOptionValue("exec_cached"));
+      }
+      TetherJob.setExecutable(job, new File(line.getOptionValue("program")), exargs, cached);
+
+      File outschema = (File)line.getParsedOptionValue("outschema");
+      job.set(AvroJob.OUTPUT_SCHEMA, Schema.parse(outschema).toString());
+      if (line.hasOption("outschemamap")) {
         job.set(AvroJob.MAP_OUTPUT_SCHEMA,
-            Schema.parse(mapOutSchema.value(opts)).toString());
-      if (opts.hasArgument(reduces))
-        job.setNumReduceTasks(reduces.value(opts));
-    } catch (Exception e) {
-      p.printHelpOn(err);
+            Schema.parse((File)line.getParsedOptionValue("outschemamap")).toString());
+      }
+      if (line.hasOption("reduces")) {
+        job.setNumReduceTasks((Integer)line.getParsedOptionValue("reduces"));
+      }
+      if (line.hasOption("protocol")) {
+        TetherJob.setProtocol(job, line.getOptionValue("protocol"));
+      }
+    }
+    catch (Exception exp) {
+      System.out.println("Unexpected exception: " + exp.getMessage());
+      formatter.printHelp("tether", opts );
       return -1;
     }
 

Added: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java (added)
+++ avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java Tue Oct  7 14:02:17 2014
@@ -0,0 +1,109 @@
+package org.apache.avro.tool;
+
+import static org.junit.Assert.assertEquals;
+
+import static java.util.Arrays.asList;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.FileWriter;
+
+
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.mapred.WordCountUtil;
+import org.apache.avro.mapred.tether.TetherJob;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+public class TestTetherTool {
+
+  /**
+   * Test that the tether tool works with the mapreduce example
+   *
+   * TODO: How can we ensure that when we run, the WordCountTether example has
+   * been properly compiled?
+   */
+  @Test
+  public void test() throws Exception {
+
+    // Create the schema files.
+    Schema outscheme = new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema();
+
+    // we need to write the schemas to a file
+    File midscfile = AvroTestUtil.tempFile(getClass(), "midschema.avpr");
+
+    FileWriter hf = null;
+    try {
+      hf =new FileWriter(midscfile);
+      hf.write(outscheme.toString());
+    }
+    finally {
+      if (hf != null) {
+        hf.close();
+      }
+    }
+
+    // Get the classpath to use as an argument.
+    String cp = System.getProperty("java.class.path");
+
+    JobConf job = new JobConf();
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+
+    outputPath.getFileSystem(job).delete(outputPath);
+
+    // create the input file
+    WordCountUtil.writeLinesFile();
+
+    // Executable is java? Argument will be WordCountTask.java - Is the classpath
+    // set appropriately automatically?
+    java.net.URI exec = new java.net.URI("java");
+    //input path
+    String in = dir + "/in";
+
+    // create a string of the arguments
+    String execargs = "-classpath " + System.getProperty("java.class.path");
+    execargs += " org.apache.avro.mapred.tether.WordCountTask";
+
+    // Create a list of the arguments to pass to the tull run method
+    java.util.List<String> runargs = new java.util.ArrayList<String> ();
+
+
+    runargs.addAll(java.util.Arrays.asList("--program", "java"));
+    runargs.addAll(asList("--exec_args", '"'+execargs+'"'));
+    runargs.addAll(asList("--exec_cached", "false"));
+    runargs.addAll(asList("--in", in));
+    runargs.addAll(asList("--out", outputPath.toString()));
+    runargs.addAll(asList("--outschema", midscfile.toString()));
+
+    TetherTool tool = new TetherTool();
+
+    tool.run(null, null, System.err, runargs);
+
+    // TODO:: We should probably do some validation
+    // validate the output
+    DatumReader<Pair<Utf8,Long>> reader = new SpecificDatumReader<Pair<Utf8,Long>>();
+    InputStream cin = new BufferedInputStream(new FileInputStream(WordCountUtil.COUNTS_FILE));
+    DataFileStream<Pair<Utf8,Long>> counts = new DataFileStream<Pair<Utf8,Long>>(cin,reader);
+    int numWords = 0;
+    for (Pair<Utf8,Long> wc : counts) {
+      assertEquals(wc.key().toString(),
+      WordCountUtil.COUNTS.get(wc.key().toString()), wc.value());
+      numWords++;
+    }
+    cin.close();
+    assertEquals(WordCountUtil.COUNTS.size(), numWords);
+  }
+}

Propchange: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: avro/trunk/lang/py/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Oct  7 14:02:17 2014
@@ -1,3 +1,4 @@
 avro.egg-info
 build
+userlogs
 MANIFEST

Modified: avro/trunk/lang/py/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/build.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/build.xml (original)
+++ avro/trunk/lang/py/build.xml Tue Oct  7 14:02:17 2014
@@ -77,6 +77,12 @@
       <fileset dir="${lib.dir}" />
     </copy>
 
+    <!--Copy the protocols used for tethering -->
+    <copy todir="${build.dir}/src/avro/tether">
+        <fileset dir="${share.schema.dir}/org/apache/avro/mapred/tether/">
+    <include name="*.avpr"/>
+        </fileset>
+    </copy>
     <!-- Inline the handshake schemas -->
     <copy file="${src.dir}/avro/ipc.py"
           toFile="${build.dir}/src/avro/ipc.py"
@@ -120,6 +126,16 @@
         <filter token="INTEROP_DATA_DIR" value="${interop.data.dir}"/>
       </filterset>
     </copy>
+
+    <!-- Inline the location of the tools jar -->
+    <copy file="${test.dir}/test_tether_word_count.py"
+          toFile="${build.dir}/test/test_tether_word_count.py"
+          overwrite="true">
+      <filterset>
+  <filter token="AVRO_VERSION" value="${avro.version}"/>
+  <filter token="TOPDIR" value="${basedir}"/>
+      </filterset>
+    </copy>
   </target>
 
   <target name="test"
@@ -135,6 +151,22 @@
     </py-test>
   </target>
 
+    <!--Created a unittest to run just the tests for tethered jobs.
+    -->
+    <target name="test-tether"
+          description="Run unit tests for a hadoop python-tethered job."
+          depends="build">
+    <taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask"
+       classpathref="java.classpath"/>
+    <py-test python="${python}" pythonpathref="test.path">
+      <fileset dir="${build.dir}/test">
+        <include name="test_tether*.py"/>
+        <!--<exclude name="test_datafile_interop.py"/>-->
+      </fileset>
+    </py-test>
+  </target>
+
+
   <target name="interop-data-test"
           description="Run python interop data tests"
           depends="build">

Added: avro/trunk/lang/py/src/avro/tether/__init__.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/__init__.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/__init__.py (added)
+++ avro/trunk/lang/py/src/avro/tether/__init__.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,7 @@
+from .util import *
+from .tether_task import *
+from .tether_task_runner import *
+
+__all__=util.__all__
+__all__+=tether_task.__all__
+__all__+=tether_task_runner.__all__

Propchange: avro/trunk/lang/py/src/avro/tether/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/src/avro/tether/tether_task.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/tether_task.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/tether_task.py (added)
+++ avro/trunk/lang/py/src/avro/tether/tether_task.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,498 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["TetherTask","TaskType","inputProtocol","outputProtocol","HTTPRequestor"]
+
+from avro import schema, protocol
+from avro import io as avio
+from avro import ipc
+
+import io as pyio
+import sys
+import os
+import traceback
+import logging
+import collections
+from StringIO import StringIO
+import threading
+
+
+# create protocol objects for the input and output protocols
+# The build process should copy InputProtocol.avpr and OutputProtocol.avpr
+# into the same directory as this module
+inputProtocol=None
+outputProtocol=None
+
+TaskType=None
+if (inputProtocol is None):
+  pfile=os.path.split(__file__)[0]+os.sep+"InputProtocol.avpr"
+
+  if not(os.path.exists(pfile)):
+    raise Exception("Could not locate the InputProtocol: {0} does not exist".format(pfile))
+
+  with file(pfile,'r') as hf:
+    prototxt=hf.read()
+
+  inputProtocol=protocol.parse(prototxt)
+
+  # use a named tuple to represent the tasktype enumeration
+  taskschema=inputProtocol.types_dict["TaskType"]
+  _ttype=collections.namedtuple("_tasktype",taskschema.symbols)
+  TaskType=_ttype(*taskschema.symbols)
+
+if (outputProtocol is None):
+  pfile=os.path.split(__file__)[0]+os.sep+"OutputProtocol.avpr"
+
+  if not(os.path.exists(pfile)):
+    raise Exception("Could not locate the OutputProtocol: {0} does not exist".format(pfile))
+
+  with file(pfile,'r') as hf:
+    prototxt=hf.read()
+
+  outputProtocol=protocol.parse(prototxt)
+
+class Collector(object):
+  """
+  Collector for map and reduce output values
+  """
+  def __init__(self,scheme=None,outputClient=None):
+    """
+
+    Parameters
+    ---------------------------------------------
+    scheme - The scheme for the datums to output - can be a json string
+           - or an instance of Schema
+    outputClient - The output client used to send messages to the parent
+    """
+
+    if not(isinstance(scheme,schema.Schema)):
+      scheme=schema.parse(scheme)
+
+    if (outputClient is None):
+      raise ValueError("output client can't be none.")
+
+    self.scheme=scheme
+    self.buff=StringIO()
+    self.encoder=avio.BinaryEncoder(self.buff)
+
+    self.datum_writer = avio.DatumWriter(writers_schema=self.scheme)
+    self.outputClient=outputClient
+
+  def collect(self,record,partition=None):
+    """Collect a map or reduce output value
+
+    Parameters
+    ------------------------------------------------------
+    record - The record to write
+    partition - Indicates the partition for a pre-partitioned map output
+              - currently not supported
+    """
+
+    self.buff.truncate(0)
+    self.datum_writer.write(record, self.encoder);
+    self.buff.flush();
+    self.buff.seek(0)
+
+    # delete all the data in the buffer
+    if (partition is None):
+
+      # TODO: Is there a more efficient way to read the data in self.buff?
+      # we could use self.buff.read() but that returns the byte array as a string
+      # will that work?  We can also use self.buff.readinto to read it into
+      # a bytearray but the byte array must be pre-allocated
+      # self.outputClient.output(self.buff.buffer.read())
+
+      #its not a StringIO
+      self.outputClient.request("output",{"datum":self.buff.read()})
+    else:
+      self.outputClient.request("outputPartitioned",{"datum":self.buff.read(),"partition":partition})
+
+
+
+def keys_are_equal(rec1,rec2,fkeys):
+  """Check if the "keys" in two records are equal. The key fields
+  are all fields for which order isn't marked ignore.
+
+  Parameters
+  -------------------------------------------------------------------------
+  rec1  - The first record
+  rec2 - The second record
+  fkeys - A list of the fields to compare
+  """
+
+  for f in fkeys:
+    if not(rec1[f]==rec2[f]):
+      return False
+
+  return True
+
+
+class HTTPRequestor(object):
+  """
+  This is a small requestor subclass I created for the HTTP protocol.
+  Since the HTTP protocol isn't persistent, we need to instantiate
+  a new transciever and new requestor for each request.
+  But I wanted to use of the requestor to be identical to that for
+  SocketTransciever so that we can seamlessly switch between the two.
+  """
+
+  def __init__(self, server,port,protocol):
+    """
+    Instantiate the class.
+
+    Parameters
+    ----------------------------------------------------------------------
+    server - The server hostname
+    port - Which port to use
+    protocol - The protocol for the communication
+    """
+
+    self.server=server
+    self.port=port
+    self.protocol=protocol
+
+  def request(self,*args,**param):
+    transciever=ipc.HTTPTransceiver(self.server,self.port)
+    requestor=ipc.Requestor(self.protocol, transciever)
+    return requestor.request(*args,**param)
+
+
+class TetherTask(object):
+  """
+  Base class for python tether mapreduce programs.
+
+  ToDo: Currently the subclass has to implement both reduce and reduceFlush.
+  This is not very pythonic. A pythonic way to implement the reducer
+  would be to pass the reducer a generator (as dumbo does) so that the user
+  could iterate over the records for the given key.
+  How would we do this. I think we would need to have two threads, one thread would run
+  the user's reduce function. This loop would be suspended when no reducer records were available.
+  The other thread would read in the records for the reducer. This thread should
+  only buffer so many records at a time (i.e if the buffer is full, self.input shouldn't return right
+  away but wait for space to free up)
+  """
+
+  def __init__(self,inschema=None,midschema=None,outschema=None):
+    """
+
+    Parameters
+    ---------------------------------------------------------
+    inschema - The scheme for the input to the mapper
+    midschema  - The scheme for the output of the mapper
+    outschema - The scheme for the output of the reducer
+
+    An example scheme for the prototypical word count example would be
+    inscheme='{"type":"record", "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+              {"name":"key","type":"string"},
+              {"name":"value","type":"long","order":"ignore"}]
+              }'
+
+    Important: The records are split into (key,value) pairs as required by map reduce
+    by using all fields with "order"=ignore for the key and the remaining fields for the value.
+
+    The subclass provides these schemas in order to tell this class which schemas it expects.
+    The configure request will also provide the schemas that the parent process is using.
+    This allows us to check whether the schemas match and if not whether we can resolve
+    the differences (see http://avro.apache.org/docs/current/spec.html#Schema+Resolution))
+
+    """
+
+
+    if (inschema is None):
+      raise ValueError("inschema can't be None")
+
+    if (midschema is None):
+      raise ValueError("midschema can't be None")
+
+    if (outschema is None):
+      raise ValueError("outschema can't be None")
+
+    # make sure we can parse the schemas
+    # Should we call fail if we can't parse the schemas?
+    self.inschema=schema.parse(inschema)
+    self.midschema=schema.parse(midschema)
+    self.outschema=schema.parse(outschema)
+
+
+    # declare various variables
+    self.clienTransciever=None
+
+    # output client is used to communicate with the parent process
+    # in particular to transmit the outputs of the mapper and reducer
+    self.outputClient = None
+
+    # collectors for the output of the mapper and reducer
+    self.midCollector=None
+    self.outCollector=None
+
+    self._partitions=None
+
+    # cache a list of the fields used by the reducer as the keys
+    # we need the fields to decide when we have finished processing all values for
+    # a given key. We cache the fields to be more efficient
+    self._red_fkeys=None
+
+    # We need to keep track of the previous record fed to the reducer
+    # b\c we need to be able to determine when we start processing a new group
+    # in the reducer
+    self.midRecord=None
+
+    # create an event object to signal when
+    # http server is ready to be shutdown
+    self.ready_for_shutdown=threading.Event()
+    self.log=logging.getLogger("TetherTask")
+
+  def open(self, inputport,clientPort=None):
+    """Open the output client - i.e the connection to the parent process
+
+    Parameters
+    ---------------------------------------------------------------
+    inputport - This is the port that the subprocess is listening on. i.e the
+                subprocess starts a server listening on this port to accept requests from
+                the parent process
+    clientPort - The port on which the server in the parent process is listening
+                - If this is None we look for the environment variable AVRO_TETHER_OUTPUT_PORT
+                - This is mainly provided for debugging purposes. In practice
+                we want to use the environment variable
+
+    """
+
+
+    # Open the connection to the parent process
+    # The port the parent process is listening on is set in the environment
+    # variable AVRO_TETHER_OUTPUT_PORT
+    # open output client, connecting to parent
+
+    if (clientPort is None):
+      clientPortString = os.getenv("AVRO_TETHER_OUTPUT_PORT")
+      if (clientPortString is None):
+        raise Exception("AVRO_TETHER_OUTPUT_PORT env var is not set")
+
+      clientPort = int(clientPortString)
+
+    self.log.info("TetherTask.open: Opening connection to parent server on port={0}".format(clientPort))
+
+    # We use the HTTP protocol although we hope to shortly have
+    # support for SocketServer,
+    usehttp=True
+
+    if(usehttp):
+      # self.outputClient =  ipc.Requestor(outputProtocol, self.clientTransceiver)
+      # since HTTP is stateless, a new transciever
+      # is created and closed for each request. We therefore set clientTransciever to None
+      # We still declare clientTransciever because for other (state) protocols we will need
+      # it and we want to check when we get the message fail whether the transciever
+      # needs to be closed.
+      # self.clientTranciever=None
+      self.outputClient =  HTTPRequestor("127.0.0.1",clientPort,outputProtocol)
+
+    else:
+      raise NotImplementedError("Only http protocol is currently supported")
+
+    try:
+      self.outputClient.request('configure',{"port":inputport})
+    except Exception as e:
+      estr= traceback.format_exc()
+      self.fail(estr)
+
+
+  def configure(self,taskType,  inSchemaText,  outSchemaText):
+    """
+
+    Parameters
+    -------------------------------------------------------------------
+    taskType - What type of task (e.g map, reduce)
+             - This is an enumeration which is specified in the input protocol
+    inSchemaText -  string containing the input schema
+                 - This is the actual schema with which the data was encoded
+                   i.e it is the writer_schema (see http://avro.apache.org/docs/current/spec.html#Schema+Resolution)
+                   This is the schema the parent process is using which might be different
+                   from the one provided by the subclass of tether_task
+
+    outSchemaText - string containing the output scheme
+                  - This is the schema expected by the parent process for the output
+    """
+    self.taskType = taskType
+
+    try:
+      inSchema = schema.parse(inSchemaText)
+      outSchema = schema.parse(outSchemaText)
+
+      if (taskType==TaskType.MAP):
+        self.inReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.inschema)
+        self.midCollector=Collector(outSchemaText,self.outputClient)
+
+      elif(taskType==TaskType.REDUCE):
+        self.midReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.midschema)
+        # this.outCollector = new Collector<OUT>(outSchema);
+        self.outCollector=Collector(outSchemaText,self.outputClient)
+
+        # determine which fields in the input record are they keys for the reducer
+        self._red_fkeys=[f.name for f in self.midschema.fields if not(f.order=='ignore')]
+
+    except Exception as e:
+
+      estr= traceback.format_exc()
+      self.fail(estr)
+
+  def set_partitions(self,npartitions):
+
+    try:
+      self._partitions=npartitions
+    except Exception as e:
+      estr= traceback.format_exc()
+      self.fail(estr)
+
+  def get_partitions():
+    """ Return the number of map output partitions of this job."""
+    return self._partitions
+
+  def input(self,data,count):
+    """ Recieve input from the server
+
+    Parameters
+    ------------------------------------------------------
+    data - Sould containg the bytes encoding the serialized data
+          - I think this gets represented as a tring
+    count - how many input records are provided in the binary stream
+    """
+    try:
+      # to avio.BinaryDecoder
+      bdata=StringIO(data)
+      decoder = avio.BinaryDecoder(bdata)
+
+      for i in range(count):
+        if (self.taskType==TaskType.MAP):
+          inRecord = self.inReader.read(decoder)
+
+          # Do we need to pass midCollector if its declared as an instance variable
+          self.map(inRecord, self.midCollector)
+
+        elif (self.taskType==TaskType.REDUCE):
+
+          # store the previous record
+          prev = self.midRecord
+
+          # read the new record
+          self.midRecord = self.midReader.read(decoder);
+          if (prev != None and not(keys_are_equal(self.midRecord,prev,self._red_fkeys))):
+            # since the key has changed we need to finalize the processing
+            # for this group of key,value pairs
+            self.reduceFlush(prev, self.outCollector)
+          self.reduce(self.midRecord, self.outCollector)
+
+    except Exception as e:
+      estr= traceback.format_exc()
+      self.log.warning("failing: "+estr)
+      self.fail(estr)
+
+  def complete(self):
+    """
+    Process the complete request
+    """
+    if ((self.taskType == TaskType.REDUCE ) and not(self.midRecord is None)):
+      try:
+        self.reduceFlush(self.midRecord, self.outCollector);
+      except Exception as e:
+        estr=traceback.format_exc()
+        self.log.warning("failing: "+estr);
+        self.fail(estr)
+
+    self.outputClient.request("complete",dict())
+
+  def map(self,record,collector):
+    """Called with input values to generate intermediat values (i.e mapper output).
+
+    Parameters
+    ----------------------------------------------------------------------------
+    record - The input record
+    collector - The collector to collect the output
+
+    This is an abstract function which should be overloaded by the application specific
+    subclass.
+    """
+
+    raise NotImplementedError("This is an abstract method which should be overloaded in the subclass")
+
+  def reduce(self,record, collector):
+    """ Called with input values to generate reducer output. Inputs are sorted by the mapper
+    key.
+
+    The reduce function is invoked once for each value belonging to a given key outputted
+    by the mapper.
+
+    Parameters
+    ----------------------------------------------------------------------------
+    record - The mapper output
+    collector - The collector to collect the output
+
+    This is an abstract function which should be overloaded by the application specific
+    subclass.
+    """
+
+    raise NotImplementedError("This is an abstract method which should be overloaded in the subclass")
+
+  def reduceFlush(self,record, collector):
+    """
+    Called with the last intermediate value in each equivalence run.
+    In other words, reduceFlush is invoked once for each key produced in the reduce
+    phase. It is called after reduce has been invoked on each value for the given key.
+
+    Parameters
+    ------------------------------------------------------------------
+    record - the last record on which reduce was invoked.
+    """
+    raise NotImplementedError("This is an abstract method which should be overloaded in the subclass")
+
+  def status(self,message):
+    """
+    Called to update task status
+    """
+    self.outputClient.request("status",{"message":message})
+
+  def count(self,group, name, amount):
+    """
+    Called to increment a counter
+    """
+    self.outputClient.request("count",{"group":group, "name":name, "amount":amount})
+
+  def fail(self,message):
+    """
+    Call to fail the task.
+    """
+    self.log.error("TetherTask.fail: failure occured message follows:\n{0}".format(message))
+    try:
+      self.outputClient.request("fail",{"message":message})
+    except Exception as e:
+      estr=traceback.format_exc()
+      self.log.error("TetherTask.fail: an exception occured while trying to send the fail message to the output server:\n{0}".format(estr))
+
+    self.close()
+
+  def close(self):
+    self.log.info("TetherTask.close: closing")
+    if not(self.clienTransciever is None):
+      try:
+        self.clienTransciever.close()
+
+      except Exception as e:
+        # ignore exceptions
+        pass
+
+    # http server is ready to be shutdown
+    self.ready_for_shutdown.set()

Propchange: avro/trunk/lang/py/src/avro/tether/tether_task.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/src/avro/tether/tether_task_runner.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/tether_task_runner.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/tether_task_runner.py (added)
+++ avro/trunk/lang/py/src/avro/tether/tether_task_runner.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,227 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["TaskRunner"]
+
+if __name__ == "__main__":
+  # Relative imports don't work when being run directly
+  from avro import tether
+  from avro.tether import TetherTask, find_port, inputProtocol
+
+else:
+  from . import TetherTask, find_port, inputProtocol
+
+from avro import ipc
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import logging
+import weakref
+import threading
+import sys
+import traceback
+
+class TaskRunnerResponder(ipc.Responder):
+  """
+  The responder for the thethered process
+  """
+  def __init__(self,runner):
+    """
+    Param
+    ----------------------------------------------------------
+    runner - Instance of TaskRunner
+    """
+    ipc.Responder.__init__(self, inputProtocol)
+
+    self.log=logging.getLogger("TaskRunnerResponder")
+
+    # should we use weak references to avoid circular references?
+    # We use weak references b\c self.runner owns this instance of TaskRunnerResponder
+    if isinstance(runner,weakref.ProxyType):
+      self.runner=runner
+    else:
+      self.runner=weakref.proxy(runner)
+
+    self.task=weakref.proxy(runner.task)
+
+  def invoke(self, message, request):
+    try:
+      if message.name=='configure':
+        self.log.info("TetherTaskRunner: Recieved configure")
+        self.task.configure(request["taskType"],request["inSchema"],request["outSchema"])
+      elif message.name=='partitions':
+        self.log.info("TetherTaskRunner: Recieved partitions")
+        try:
+          self.task.set_partitions(request["partitions"])
+        except Exception as e:
+          self.log.error("Exception occured while processing the partitions message: Message:\n"+traceback.format_exc())
+          raise
+      elif message.name=='input':
+        self.log.info("TetherTaskRunner: Recieved input")
+        self.task.input(request["data"],request["count"])
+      elif message.name=='abort':
+        self.log.info("TetherTaskRunner: Recieved abort")
+        self.runner.close()
+      elif message.name=='complete':
+        self.log.info("TetherTaskRunner: Recieved complete")
+        self.task.complete()
+        self.task.close()
+        self.runner.close()
+      else:
+        self.log.warning("TetherTaskRunner: recieved unknown message {0}".format(message.name))
+
+    except Exception as e:
+      self.log.error("Error occured while processing message: {0}".format(message.name))
+      emsg=traceback.format_exc()
+      self.task.fail(emsg)
+
+    return None
+
+
+def HTTPHandlerGen(runner):
+  """
+  This is a class factory for the HTTPHandler. We need
+  a factory b\c we need a reference to the runner
+
+  Parameters
+  -----------------------------------------------------------------
+  runner - instance of the task runner
+  """
+
+  if not(isinstance(runner,weakref.ProxyType)):
+    runnerref=weakref.proxy(runner)
+  else:
+    runnerref=runner
+
+  class TaskRunnerHTTPHandler(BaseHTTPRequestHandler):
+    """Create a handler for the parent.
+    """
+
+    runner=runnerref
+    def __init__(self,*args,**param):
+      """
+      """
+      BaseHTTPRequestHandler.__init__(self,*args,**param)
+
+    def do_POST(self):
+      self.responder =TaskRunnerResponder(self.runner)
+      call_request_reader = ipc.FramedReader(self.rfile)
+      call_request = call_request_reader.read_framed_message()
+      resp_body = self.responder.respond(call_request)
+      self.send_response(200)
+      self.send_header('Content-Type', 'avro/binary')
+      self.end_headers()
+      resp_writer = ipc.FramedWriter(self.wfile)
+      resp_writer.write_framed_message(resp_body)
+
+  return TaskRunnerHTTPHandler
+
+class TaskRunner(object):
+  """This class ties together the server handling the requests from
+  the parent process and the instance of TetherTask which actually
+  implements the logic for the mapper and reducer phases
+  """
+
+  def __init__(self,task):
+    """
+    Construct the runner
+
+    Parameters
+    ---------------------------------------------------------------
+    task - An instance of tether task
+    """
+
+    self.log=logging.getLogger("TaskRunner:")
+
+    if not(isinstance(task,TetherTask)):
+      raise ValueError("task must be an instance of tether task")
+    self.task=task
+
+    self.server=None
+    self.sthread=None
+
+  def start(self,outputport=None,join=True):
+    """
+    Start the server
+
+    Parameters
+    -------------------------------------------------------------------
+    outputport - (optional) The port on which the parent process is listening
+                 for requests from the task.
+               - This will typically be supplied by an environment variable
+                 we allow it to be supplied as an argument mainly for debugging
+    join       - (optional) If set to fault then we don't issue a join to block
+                 until the thread excecuting the server terminates.
+                This is mainly for debugging. By setting it to false,
+                we can resume execution in this thread so that we can do additional
+                testing
+    """
+
+    port=find_port()
+    address=("localhost",port)
+
+
+    def thread_run(task_runner=None):
+      task_runner.server = HTTPServer(address, HTTPHandlerGen(task_runner))
+      task_runner.server.allow_reuse_address = True
+      task_runner.server.serve_forever()
+
+    # create a separate thread for the http server
+    sthread=threading.Thread(target=thread_run,kwargs={"task_runner":self})
+    sthread.start()
+
+    self.sthread=sthread
+    # This needs to run in a separat thread b\c serve_forever() blocks
+    self.task.open(port,clientPort=outputport)
+
+    # wait for the other thread to finish
+    if (join):
+      self.task.ready_for_shutdown.wait()
+      self.server.shutdown()
+
+      # should we do some kind of check to make sure it exits
+      self.log.info("Shutdown the logger")
+      # shutdown the logging
+      logging.shutdown()
+
+  def close(self):
+    """
+    Handler for the close message
+    """
+
+    self.task.close()
+
+if __name__ == '__main__':
+  # TODO::Make the logging level a parameter we can set
+  # logging.basicConfig(level=logging.INFO,filename='/tmp/log',filemode='w')
+  logging.basicConfig(level=logging.INFO)
+
+  if (len(sys.argv)<=1):
+    print "Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass"
+    raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass")
+
+  fullcls=sys.argv[1]
+  mod,cname=fullcls.rsplit(".",1)
+
+  logging.info("tether_task_runner.__main__: Task: {0}".format(fullcls))
+
+  modobj=__import__(mod,fromlist=cname)
+
+  taskcls=getattr(modobj,cname)
+  task=taskcls()
+
+  runner=TaskRunner(task=task)
+  runner.start()

Propchange: avro/trunk/lang/py/src/avro/tether/tether_task_runner.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/src/avro/tether/util.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/util.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/util.py (added)
+++ avro/trunk/lang/py/src/avro/tether/util.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,34 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["find_port"]
+
+import socket
+
+
+def find_port():
+  """
+  Return an unbound port
+  """
+  s=socket.socket()
+  s.bind(("127.0.0.1",0))
+
+  port=s.getsockname()[1]
+  s.close()
+
+  return port
\ No newline at end of file

Propchange: avro/trunk/lang/py/src/avro/tether/util.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/test/mock_tether_parent.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/mock_tether_parent.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/mock_tether_parent.py (added)
+++ avro/trunk/lang/py/test/mock_tether_parent.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import set_avro_test_path
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from avro import ipc
+from avro import protocol
+from avro import tether
+
+import socket
+
+def find_port():
+  """
+  Return an unbound port
+  """
+  s=socket.socket()
+  s.bind(("127.0.0.1",0))
+
+  port=s.getsockname()[1]
+  s.close()
+
+  return port
+
+SERVER_ADDRESS = ('localhost', find_port())
+
+class MockParentResponder(ipc.Responder):
+  """
+  The responder for the mocked parent
+  """
+  def __init__(self):
+    ipc.Responder.__init__(self, tether.outputProtocol)
+
+  def invoke(self, message, request):
+    if message.name=='configure':
+      print "MockParentResponder: Recieved 'configure': inputPort={0}".format(request["port"])
+
+    elif message.name=='status':
+      print "MockParentResponder: Recieved 'status': message={0}".format(request["message"])
+    elif message.name=='fail':
+      print "MockParentResponder: Recieved 'fail': message={0}".format(request["message"])
+    else:
+      print "MockParentResponder: Recieved {0}".format(message.name)
+
+    # flush the output so it shows up in the parent process
+    sys.stdout.flush()
+
+    return None
+
+class MockParentHandler(BaseHTTPRequestHandler):
+  """Create a handler for the parent.
+  """
+  def do_POST(self):
+    self.responder =MockParentResponder()
+    call_request_reader = ipc.FramedReader(self.rfile)
+    call_request = call_request_reader.read_framed_message()
+    resp_body = self.responder.respond(call_request)
+    self.send_response(200)
+    self.send_header('Content-Type', 'avro/binary')
+    self.end_headers()
+    resp_writer = ipc.FramedWriter(self.wfile)
+    resp_writer.write_framed_message(resp_body)
+
+if __name__ == '__main__':
+  if (len(sys.argv)<=1):
+    raise ValueError("Usage: mock_tether_parent command")
+
+  cmd=sys.argv[1].lower()
+  if (sys.argv[1]=='start_server'):
+    if (len(sys.argv)==3):
+      port=int(sys.argv[2])
+    else:
+      raise ValueError("Usage: mock_tether_parent start_server port")
+
+    SERVER_ADDRESS=(SERVER_ADDRESS[0],port)
+    print "mock_tether_parent: Launching Server on Port: {0}".format(SERVER_ADDRESS[1])
+
+    # flush the output so it shows up in the parent process
+    sys.stdout.flush()
+    parent_server = HTTPServer(SERVER_ADDRESS, MockParentHandler)
+    parent_server.allow_reuse_address = True
+    parent_server.serve_forever()

Propchange: avro/trunk/lang/py/test/mock_tether_parent.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/test/set_avro_test_path.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/set_avro_test_path.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/set_avro_test_path.py (added)
+++ avro/trunk/lang/py/test/set_avro_test_path.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,25 @@
+"""
+Module adjusts the path PYTHONPATH so the unittests
+will work even if an egg for AVRO is already installed.
+By default eggs always appear higher on pythons path then
+directories set via the environment variable PYTHONPATH.
+
+For reference see:
+http://www.velocityreviews.com/forums/t716589-pythonpath-and-eggs.html
+http://stackoverflow.com/questions/897792/pythons-sys-path-value.
+
+Unittests would therefore use the installed AVRO and not the AVRO
+being built. To work around this the unittests import this module before
+importing AVRO. This module in turn adjusts the python path so that the test
+build of AVRO is higher on the path then any installed eggs.
+"""
+import sys
+import os
+
+# determine the build directory and then make sure all paths that start with the
+# build directory are at the top of the path
+builddir=os.path.split(os.path.split(__file__)[0])[0]
+bpaths=filter(lambda s:s.startswith(builddir), sys.path)
+
+for p in bpaths:
+  sys.path.insert(0,p)
\ No newline at end of file

Propchange: avro/trunk/lang/py/test/set_avro_test_path.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/py/test/test_datafile.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_datafile.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_datafile.py (original)
+++ avro/trunk/lang/py/test/test_datafile.py Tue Oct  7 14:02:17 2014
@@ -15,6 +15,9 @@
 # limitations under the License.
 import os
 import unittest
+
+import set_avro_test_path
+
 from avro import schema
 from avro import io
 from avro import datafile

Modified: avro/trunk/lang/py/test/test_datafile_interop.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_datafile_interop.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_datafile_interop.py (original)
+++ avro/trunk/lang/py/test/test_datafile_interop.py Tue Oct  7 14:02:17 2014
@@ -15,6 +15,9 @@
 # limitations under the License.
 import os
 import unittest
+
+import set_avro_test_path
+
 from avro import io
 from avro import datafile
 

Modified: avro/trunk/lang/py/test/test_io.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_io.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_io.py (original)
+++ avro/trunk/lang/py/test/test_io.py Tue Oct  7 14:02:17 2014
@@ -19,6 +19,9 @@ try:
 except ImportError:
   from StringIO import StringIO
 from binascii import hexlify
+
+import set_avro_test_path
+
 from avro import schema
 from avro import io
 

Modified: avro/trunk/lang/py/test/test_ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_ipc.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_ipc.py (original)
+++ avro/trunk/lang/py/test/test_ipc.py Tue Oct  7 14:02:17 2014
@@ -19,6 +19,8 @@ servers yet available.
 """
 import unittest
 
+import set_avro_test_path
+
 # This test does import this code, to make sure it at least passes
 # compilation.
 from avro import ipc

Modified: avro/trunk/lang/py/test/test_schema.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_schema.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_schema.py (original)
+++ avro/trunk/lang/py/test/test_schema.py Tue Oct  7 14:02:17 2014
@@ -17,6 +17,8 @@
 Test the schema parsing logic.
 """
 import unittest
+import set_avro_test_path
+
 from avro import schema
 
 def print_test_name(test_name):

Added: avro/trunk/lang/py/test/test_tether_task.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_tether_task.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_tether_task.py (added)
+++ avro/trunk/lang/py/test/test_tether_task.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+import os
+import subprocess
+import sys
+import time
+import unittest
+
+import set_avro_test_path
+
+class TestTetherTask(unittest.TestCase):
+  """
+  TODO: We should validate the the server response by looking at stdout
+  """
+  def test1(self):
+    """
+    Test that the thether_task is working. We run the mock_tether_parent in a separate
+    subprocess
+    """
+    from avro import tether
+    from avro import io as avio
+    from avro import schema
+    from avro.tether import HTTPRequestor,inputProtocol, find_port
+
+    import StringIO
+    import mock_tether_parent
+    from word_count_task import WordCountTask
+
+    task=WordCountTask()
+
+    proc=None
+    try:
+      # launch the server in a separate process
+      # env["AVRO_TETHER_OUTPUT_PORT"]=output_port
+      env=dict()
+      env["PYTHONPATH"]=':'.join(sys.path)
+      server_port=find_port()
+
+      pyfile=mock_tether_parent.__file__
+      proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(server_port)])
+      input_port=find_port()
+
+      print "Mock server started process pid={0}".format(proc.pid)
+      # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
+      # so we give the subprocess time to start up
+      time.sleep(1)
+      task.open(input_port,clientPort=server_port)
+
+      # TODO: We should validate that open worked by grabbing the STDOUT of the subproces
+      # and ensuring that it outputted the correct message.
+
+      #***************************************************************
+      # Test the mapper
+      task.configure(tether.TaskType.MAP,str(task.inschema),str(task.midschema))
+
+      # Serialize some data so we can send it to the input function
+      datum="This is a line of text"
+      writer = StringIO.StringIO()
+      encoder = avio.BinaryEncoder(writer)
+      datum_writer = avio.DatumWriter(task.inschema)
+      datum_writer.write(datum, encoder)
+
+      writer.seek(0)
+      data=writer.read()
+
+      # Call input to simulate calling map
+      task.input(data,1)
+
+      # Test the reducer
+      task.configure(tether.TaskType.REDUCE,str(task.midschema),str(task.outschema))
+
+      # Serialize some data so we can send it to the input function
+      datum={"key":"word","value":2}
+      writer = StringIO.StringIO()
+      encoder = avio.BinaryEncoder(writer)
+      datum_writer = avio.DatumWriter(task.midschema)
+      datum_writer.write(datum, encoder)
+
+      writer.seek(0)
+      data=writer.read()
+
+      # Call input to simulate calling reduce
+      task.input(data,1)
+
+      task.complete()
+
+      # try a status
+      task.status("Status message")
+
+    except Exception as e:
+      raise
+    finally:
+      # close the process
+      if not(proc is None):
+        proc.kill()
+
+      pass
+
+if __name__ == '__main__':
+  unittest.main()
\ No newline at end of file

Propchange: avro/trunk/lang/py/test/test_tether_task.py
------------------------------------------------------------------------------
    svn:eol-style = native