You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/10/07 16:02:18 UTC
svn commit: r1629897 [1/2] - in /avro/trunk: ./ lang/java/ lang/java/mapred/
lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/
lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/
lang/java/tools/ lang/java/tools/src/main/java/org/...
Author: cutting
Date: Tue Oct 7 14:02:17 2014
New Revision: 1629897
URL: http://svn.apache.org/r1629897
Log:
AVRO-570. Python: Add connector for tethered mapreduce. Contributed by Jeremy Lewi and Steven Willis.
Added:
avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java (with props)
avro/trunk/lang/py/src/avro/tether/
avro/trunk/lang/py/src/avro/tether/__init__.py (with props)
avro/trunk/lang/py/src/avro/tether/tether_task.py (with props)
avro/trunk/lang/py/src/avro/tether/tether_task_runner.py (with props)
avro/trunk/lang/py/src/avro/tether/util.py (with props)
avro/trunk/lang/py/test/mock_tether_parent.py (with props)
avro/trunk/lang/py/test/set_avro_test_path.py (with props)
avro/trunk/lang/py/test/test_tether_task.py (with props)
avro/trunk/lang/py/test/test_tether_task_runner.py (with props)
avro/trunk/lang/py/test/test_tether_word_count.py (with props)
avro/trunk/lang/py/test/word_count_task.py (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/pom.xml
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
avro/trunk/lang/java/pom.xml
avro/trunk/lang/java/tools/pom.xml
avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java
avro/trunk/lang/py/ (props changed)
avro/trunk/lang/py/build.xml
avro/trunk/lang/py/test/test_datafile.py
avro/trunk/lang/py/test/test_datafile_interop.py
avro/trunk/lang/py/test/test_io.py
avro/trunk/lang/py/test/test_ipc.py
avro/trunk/lang/py/test/test_schema.py
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Oct 7 14:02:17 2014
@@ -23,6 +23,9 @@ Trunk (not yet released)
AVRO-1502. Java: Generated classes now implement Serializable. (cutting)
+ AVRO-570. Python: Add connector for tethered mapreduce.
+ (Jeremy Lewi and Steven Willis via cutting)
+
OPTIMIZATIONS
IMPROVEMENTS
Modified: avro/trunk/lang/java/mapred/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/pom.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/pom.xml (original)
+++ avro/trunk/lang/java/mapred/pom.xml Tue Oct 7 14:02:17 2014
@@ -140,6 +140,11 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons-codec.version}</version>
+ </dependency>
</dependencies>
<profiles>
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java Tue Oct 7 14:02:17 2014
@@ -41,6 +41,7 @@ public class TetherJob extends Configure
public static final String TETHER_EXEC="avro.tether.executable";
public static final String TETHER_EXEC_ARGS="avro.tether.executable_args";
public static final String TETHER_EXEC_CACHED="avro.tether.executable_cached";
+ public static final String TETHER_PROTOCOL="avro.tether.protocol";
/** Get the URI of the application's executable. */
public static URI getExecutable(JobConf job) {
@@ -61,22 +62,44 @@ public class TetherJob extends Configure
* and provides the mapper/reducer).
* @param job - Job
* @param executable - The URI of the executable
- * @param args - A string of additional arguments
+ * @param args - List of additional arguments; Null if no arguments
* @param cached - If true, the executable URI is cached using DistributedCache
* - if false its not cached. I.e if the file is already stored on each local file system
* or if its on a NFS share
*/
public static void setExecutable(JobConf job, File executable, List<String> args, boolean cached) {
job.set(TETHER_EXEC, executable.toString());
- StringBuilder sb = new StringBuilder();
- for (String a : args) {
- sb.append(a);
- sb.append('\n');
+ if (args != null){
+ StringBuilder sb = new StringBuilder();
+ for (String a : args) {
+ sb.append(a);
+ sb.append('\n');
+ }
+ job.set(TETHER_EXEC_ARGS, sb.toString());
}
- job.set(TETHER_EXEC_ARGS, sb.toString());
job.set(TETHER_EXEC_CACHED, (new Boolean(cached)).toString());
}
+ /**
+ * Extract from the job configuration file an instance of the TRANSPROTO enumeration
+ * to represent the protocol to use for the communication
+ * @param job
+ * @return
+ */
+ public static TetheredProcess.Protocol getProtocol(JobConf job) {
+
+ if (job.get(TetherJob.TETHER_PROTOCOL)==null) {
+ return TetheredProcess.Protocol.NONE;
+ } else if (job.get(TetherJob.TETHER_PROTOCOL).equals("http")) {
+ return TetheredProcess.Protocol.HTTP;
+ } else if (job.get(TetherJob.TETHER_PROTOCOL).equals("sasl")) {
+ return TetheredProcess.Protocol.SASL;
+ } else {
+ throw new RuntimeException("Unknown value for protocol: " +job.get(TetherJob.TETHER_PROTOCOL));
+ }
+
+ }
+
/** Submit a job to the map/reduce cluster. All of the necessary
* modifications to the job to run under tether are made to the
* configuration.
@@ -92,6 +115,24 @@ public class TetherJob extends Configure
return new JobClient(conf).submitJob(conf);
}
+ /**
+ * Determines which transport protocol (e.g http or sasl) used to communicate
+ * between the parent and subprocess
+ *
+ * @param job - job configuration
+ * @param proto - String identifying the protocol currently http or sasl
+ */
+ public static void setProtocol(JobConf job, String proto) throws IOException {
+ proto=proto.trim().toLowerCase();
+
+ if (!(proto.equals("http") || proto.equals("sasl"))) {
+ throw new IOException("protocol must be 'http' or 'sasl'");
+ }
+
+ job.set(TETHER_PROTOCOL,proto);
+
+ }
+
private static void setupTetherJob(JobConf job) throws IOException {
job.setMapRunnerClass(TetherMapRunner.class);
job.setPartitionerClass(TetherPartitioner.class);
@@ -107,6 +148,11 @@ public class TetherJob extends Configure
// set the map output key class to TetherData
job.setMapOutputKeyClass(TetherData.class);
+ // if protocol isn't set
+ if (job.getStrings(TETHER_PROTOCOL)==null) {
+ job.set(TETHER_PROTOCOL, "sasl");
+ }
+
// add TetherKeySerialization to io.serializations
Collection<String> serializations =
job.getStringCollection("io.serializations");
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java Tue Oct 7 14:02:17 2014
@@ -80,6 +80,7 @@ class TetherKeySerialization
public void serialize(TetherData datum) throws IOException {
encoder.writeBytes(datum.buffer());
+ encoder.flush(); //Flush shouldn't be required. Might be a bug in AVRO.
}
public void close() throws IOException {
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherMapRunner.java Tue Oct 7 14:02:17 2014
@@ -36,7 +36,7 @@ import org.apache.avro.mapred.AvroJob;
class TetherMapRunner
extends MapRunner<TetherData, NullWritable, TetherData, NullWritable> {
- static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
private JobConf job;
private TetheredProcess process;
@@ -54,11 +54,13 @@ class TetherMapRunner
process = new TetheredProcess(job, collector, reporter);
// configure it
+ LOG.info("send configure to subprocess for map task");
process.inputClient.configure
(TaskType.MAP,
job.get(AvroJob.INPUT_SCHEMA),
AvroJob.getMapOutputSchema(job).toString());
+ LOG.info("send partitions to subprocess for map task");
process.inputClient.partitions(job.getNumReduceTasks());
// run map
@@ -72,6 +74,7 @@ class TetherMapRunner
if (process.outputService.isFinished())
break;
}
+ LOG.info("send complete to subprocess for map task");
process.inputClient.complete();
// wait for completion
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherOutputService.java Tue Oct 7 14:02:17 2014
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class TetherOutputService implements OutputProtocol {
private Reporter reporter;
@@ -31,6 +33,11 @@ class TetherOutputService implements Out
private boolean complete;
private String error;
+ private static final Logger LOG = LoggerFactory.getLogger(TetherOutputService.class);
+
+ // timeout when waiting for messages in seconds.
+ // what is a good value?
+ public static final long TIMEOUT=10*1000;
public TetherOutputService(OutputCollector<TetherData,NullWritable> collector,
Reporter reporter) {
this.reporter = reporter;
@@ -38,15 +45,20 @@ class TetherOutputService implements Out
}
public synchronized void configure(int inputPort) {
- TetherMapRunner.LOG.info("got input port from child");
+ LOG.info("got input port from child: inputport="+inputPort);
this.inputPort = inputPort;
notify();
}
- public synchronized int inputPort() throws InterruptedException {
- while (inputPort == 0) {
- TetherMapRunner.LOG.info("waiting for input port from child");
- wait();
+ public synchronized int inputPort() throws Exception {
+ if (inputPort==0) {
+ LOG.info("waiting for input port from child");
+ wait(TIMEOUT);
+ }
+
+ if (inputPort==0) {
+ LOG.error("Parent process timed out waiting for subprocess to send input port. Check the job log files for more info.");
+ throw new Exception("Parent process timed out waiting for subprocess to send input port");
}
return inputPort;
}
@@ -55,7 +67,7 @@ class TetherOutputService implements Out
try {
collector.collect(new TetherData(datum), NullWritable.get());
} catch (Throwable e) {
- TetherMapRunner.LOG.warn("Error: "+e, e);
+ LOG.warn("Error: "+e, e);
synchronized (this) {
error = e.toString();
}
@@ -75,13 +87,13 @@ class TetherOutputService implements Out
}
public synchronized void fail(String message) {
- TetherMapRunner.LOG.warn("Failing: "+message);
+ LOG.warn("Failing: "+message);
error = message.toString();
notify();
}
public synchronized void complete() {
- TetherMapRunner.LOG.info("got task complete");
+ LOG.info("got task complete");
complete = true;
notify();
}
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java Tue Oct 7 14:02:17 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -43,13 +44,15 @@ import org.apache.avro.ipc.SaslSocketSer
import org.apache.avro.ipc.SaslSocketTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TetheredProcess {
- static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+ static final Logger LOG = LoggerFactory.getLogger(TetheredProcess.class);
private JobConf job;
@@ -59,15 +62,44 @@ class TetheredProcess {
Transceiver clientTransceiver;
InputProtocol inputClient;
+ /**
+ * Enumeration defines which transport protocol to use to communicate between
+ * the map/reduce java daemons and the tethered proce
+ */
+ public enum Protocol {HTTP,SASL,NONE};
+
+ //which protocol we are using
+ Protocol proto;
+
public TetheredProcess(JobConf job,
OutputCollector<TetherData, NullWritable> collector,
Reporter reporter) throws Exception {
try {
// start server
this.outputService = new TetherOutputService(collector, reporter);
- this.outputServer = new SaslSocketServer
- (new SpecificResponder(OutputProtocol.class, outputService),
- new InetSocketAddress(0));
+
+ proto=TetherJob.getProtocol(job);
+
+ InetSocketAddress iaddress;
+ switch (proto) {
+ case SASL:
+ iaddress=new InetSocketAddress(0);
+ this.outputServer = new SaslSocketServer
+ (new SpecificResponder(OutputProtocol.class, outputService),
+ iaddress);
+ break;
+ case HTTP:
+ iaddress=new InetSocketAddress(0);
+ //set it up for http
+ this.outputServer= new HttpServer
+ (new SpecificResponder(OutputProtocol.class, outputService),
+ iaddress.getPort());
+ break;
+ case NONE:
+ default:
+ throw new RuntimeException("No transport protocol was specified in the job configuraiton");
+ }
+
outputServer.start();
// start sub-process, connecting back to server
@@ -85,8 +117,18 @@ class TetheredProcess {
LOG.error("Could not start subprocess");
throw new RuntimeException("Could not start subprocess");
}
- this.clientTransceiver
- = new SaslSocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+ // open client, connecting to sub-process
+ switch (proto) {
+ case SASL:
+ this.clientTransceiver =new SaslSocketTransceiver(new InetSocketAddress(outputService.inputPort()));
+ break;
+ case HTTP:
+ this.clientTransceiver =new HttpTransceiver(new URL("http://127.0.0.1:"+outputService.inputPort()));
+ break;
+ default:
+ throw new RuntimeException("Error: code to handle this protocol is not implemented");
+ }
+
this.inputClient =
SpecificRequestor.getClient(InputProtocol.class, clientTransceiver);
@@ -131,17 +173,21 @@ class TetheredProcess {
command.add(executable);
// Add the executable arguments. We assume the arguments are separated by
- // spaces so we split the argument string based on spaces and add each
+ // newlines so we split the argument string based on newlines and add each
// token to command We need to do it this way because
// TaskLog.captureOutAndError will put quote marks around each argument so
// if we pass a single string containing all arguments we get quoted
// incorrectly
String args=job.get(TetherJob.TETHER_EXEC_ARGS);
- String[] aparams=args.split("\n");
- for (int i=0;i<aparams.length; i++){
- aparams[i]=aparams[i].trim();
- if (aparams[i].length()>0){
- command.add(aparams[i]);
+
+ // args might be null if TETHER_EXEC_ARGS wasn't set.
+ if (args != null) {
+ String[] aparams=args.split("\n");
+ for (int i=0;i<aparams.length; i++){
+ aparams[i]=aparams[i].trim();
+ if (aparams[i].length()>0){
+ command.add(aparams[i]);
+ }
}
}
@@ -163,6 +209,18 @@ class TetheredProcess {
env.put("AVRO_TETHER_OUTPUT_PORT",
Integer.toString(outputServer.getPort()));
+ // add an environment variable to specify what protocol to use for communication
+ env.put("AVRO_TETHER_PROTOCOL", job.get(TetherJob.TETHER_PROTOCOL));
+
+ // print an info message about the command
+ String imsg="";
+ for (int i=0; i<command.size();i++) {
+ imsg=command.get(i)+" ";
+ }
+ LOG.info("TetheredProcess.startSubprocess: command: "+imsg);
+ LOG.info("Tetheredprocess.startSubprocess: stdout logged to: " + stdout.toString()) ;
+ LOG.info("Tetheredprocess.startSubprocess: stderr logged to: " + stderr.toString()) ;
+
// start child process
ProcessBuilder builder = new ProcessBuilder(command);
System.out.println(command);
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java Tue Oct 7 14:02:17 2014
@@ -43,13 +43,19 @@ import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
import org.apache.avro.specific.SpecificDatumReader;
+/**
+ * See also TestTetherTool for an example of how to submit jobs using the thether tool.
+ *
+ */
public class TestWordCountTether {
- @Test
- @SuppressWarnings("deprecation")
- public void testJob() throws Exception {
-
+ /**
+ * Run a job using the given transport protocol
+ * @param proto
+ */
+ private void _runjob(String proto)throws Exception {
+ // System.out.println(System.getProperty("java.class.path").replace(":", "\n"));
System.out.println(System.getProperty("java.class.path"));
JobConf job = new JobConf();
String dir = System.getProperty("test.dir", ".") + "/mapred";
@@ -80,6 +86,7 @@ public class TestWordCountTether {
AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
job.set(AvroJob.OUTPUT_SCHEMA, outscheme.toString());
+ TetherJob.setProtocol(job, proto);
TetherJob.runJob(job);
// validate the output
@@ -100,5 +107,23 @@ public class TestWordCountTether {
}
+ /**
+ * Test the job using the sasl protocol
+ * @throws Exception
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testJob() throws Exception {
+ _runjob("sasl");
+ }
+ /**
+ * Test the job using the http protocol
+ * @throws Exception
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testhtp() throws Exception {
+ _runjob("http");
+ }
}
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java Tue Oct 7 14:02:17 2014
@@ -22,11 +22,13 @@ import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.net.InetSocketAddress;
+import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.avro.Schema;
+import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.SaslSocketTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -61,6 +63,8 @@ public abstract class TetherTask<IN,MID,
private Collector<MID> midCollector;
private Collector<OUT> outCollector;
+ private TetheredProcess.Protocol proto;
+
private static class Buffer extends ByteArrayOutputStream {
public ByteBuffer data() {
return ByteBuffer.wrap(buf, 0, count);
@@ -98,12 +102,37 @@ public abstract class TetherTask<IN,MID,
void open(int inputPort) throws IOException {
// open output client, connecting to parent
String clientPortString = System.getenv("AVRO_TETHER_OUTPUT_PORT");
+ String protocol = System.getenv("AVRO_TETHER_PROTOCOL");
if (clientPortString == null)
throw new RuntimeException("AVRO_TETHER_OUTPUT_PORT env var is null");
int clientPort = Integer.parseInt(clientPortString);
- this.clientTransceiver =
+
+ if (protocol == null) {
+ throw new RuntimeException("AVRO_TETHER_PROTOCOL env var is null");
+ }
+
+ protocol=protocol.trim().toLowerCase();
+
+ if (protocol.equals("http")) {
+ proto=TetheredProcess.Protocol.HTTP;
+ } else if (protocol.equals("sasl")) {
+ proto=TetheredProcess.Protocol.SASL;
+ } else {
+ throw new RuntimeException("AVROT_TETHER_PROTOCOL="+protocol+" but this protocol is unsupported");
+ }
+
+ switch (proto) {
+ case SASL:
+ this.clientTransceiver =
new SaslSocketTransceiver(new InetSocketAddress(clientPort));
- this.outputClient = SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+ this.outputClient = SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+ break;
+
+ case HTTP:
+ this.clientTransceiver =new HttpTransceiver(new URL("http://127.0.0.1:"+clientPort));
+ this.outputClient = SpecificRequestor.getClient(OutputProtocol.class, clientTransceiver);
+ break;
+ }
// send inputPort to parent
outputClient.configure(inputPort);
@@ -167,7 +196,9 @@ public abstract class TetherTask<IN,MID,
LOG.warn("failing: "+e, e);
fail(e.toString());
}
+ LOG.info("TetherTask: Sending complete to parent process.");
outputClient.complete();
+ LOG.info("TetherTask: Done sending complete to parent process.");
}
/** Called with input values to generate intermediate values. */
@@ -197,6 +228,7 @@ public abstract class TetherTask<IN,MID,
}
void close() {
+ LOG.info("Closing the transciever");
if (clientTransceiver != null)
try {
clientTransceiver.close();
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTaskRunner.java Tue Oct 7 14:02:17 2014
@@ -20,13 +20,19 @@ package org.apache.avro.mapred.tether;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.SaslSocketServer;
+import org.apache.avro.ipc.SaslSocketTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.ipc.Server;
/** Java implementation of a tether executable. Useless except for testing,
* since it's already possible to write Java MapReduce programs without
@@ -35,16 +41,52 @@ import org.apache.avro.ipc.specific.Spec
public class TetherTaskRunner implements InputProtocol {
static final Logger LOG = LoggerFactory.getLogger(TetherTaskRunner.class);
- private SaslSocketServer inputServer;
+ private Server inputServer;
private TetherTask task;
+ private TetheredProcess.Protocol proto;
+
public TetherTaskRunner(TetherTask task) throws IOException {
this.task = task;
- // start input server
- this.inputServer = new SaslSocketServer
+ //determine what protocol we are using
+ String protocol = System.getenv("AVRO_TETHER_PROTOCOL");
+ if (protocol == null) {
+ throw new RuntimeException("AVRO_TETHER_PROTOCOL env var is null");
+ }
+
+ protocol=protocol.trim().toLowerCase();
+
+ if (protocol.equals("http")) {
+ LOG.info("Use HTTP protocol");
+ proto=TetheredProcess.Protocol.HTTP;
+ } else if (protocol.equals("sasl")) {
+ LOG.info("Use SASL protocol");
+ proto=TetheredProcess.Protocol.SASL;
+ } else {
+ throw new RuntimeException("AVRO_TETHER_PROTOCOL="+protocol+" but this protocol is unsupported");
+ }
+
+ InetSocketAddress iaddress=new InetSocketAddress(0);
+
+ switch(proto) {
+ case SASL:
+ // start input server
+ this.inputServer = new SaslSocketServer
+ (new SpecificResponder(InputProtocol.class, this),
+ iaddress);
+ LOG.info("Started SaslSocketServer on port:"+iaddress.getPort());
+ break;
+
+ case HTTP:
+ this.inputServer=new HttpServer
(new SpecificResponder(InputProtocol.class, this),
- new InetSocketAddress(0));
+ iaddress.getPort());
+
+ LOG.info("Started HttpServer on port:"+iaddress.getPort());
+ break;
+ }
+
inputServer.start();
// open output to parent
@@ -74,16 +116,19 @@ public class TetherTaskRunner implements
@Override public synchronized void complete() {
LOG.info("got input complete");
task.complete();
- close();
}
/** Wait for task to complete. */
public void join() throws InterruptedException {
+ LOG.info("TetherTaskRunner: Start join.");
inputServer.join();
+ LOG.info("TetherTaskRunner: Finish join.");
}
private void close() {
+ LOG.info("Closing the task");
task.close();
+ LOG.info("Finished closing the task.");
if (inputServer != null)
inputServer.close();
}
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java Tue Oct 7 14:02:17 2014
@@ -24,11 +24,15 @@ import java.util.StringTokenizer;
import org.apache.avro.mapred.Pair;
import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/** Example Java tethered mapreduce executable. Implements map and reduce
* functions for word count. */
public class WordCountTask
extends TetherTask<Utf8,Pair<Utf8,Long>,Pair<Utf8,Long>> {
+ static final Logger LOG = LoggerFactory.getLogger(WordCountTask.class);
@Override public void map(Utf8 text, Collector<Pair<Utf8,Long>> collector)
throws IOException {
StringTokenizer tokens = new StringTokenizer(text.toString());
@@ -52,6 +56,7 @@ public class WordCountTask
public static void main(String[] args) throws Exception {
new TetherTaskRunner(new WordCountTask()).join();
+ LOG.info("WordCountTask finished");
}
}
Modified: avro/trunk/lang/java/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/pom.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/pom.xml (original)
+++ avro/trunk/lang/java/pom.xml Tue Oct 7 14:02:17 2014
@@ -58,8 +58,12 @@
<velocity.version>1.7</velocity.version>
<maven.version>2.0.10</maven.version>
<ant.version>1.9.0</ant.version>
- <commons-lang.version>2.6</commons-lang.version>
+ <commons-cli.version>1.2</commons-cli.version>
+ <commons-codec.version>1.9</commons-codec.version>
<commons-compress.version>1.8.1</commons-compress.version>
+ <commons-httpclient.version>3.1</commons-httpclient.version>
+ <commons-lang.version>2.6</commons-lang.version>
+ <commons-logging.version>1.1.1</commons-logging.version>
<tukaani.version>1.5</tukaani.version>
<easymock.version>3.2</easymock.version>
<hamcrest.version>1.3</hamcrest.version>
@@ -167,6 +171,18 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${jar-plugin.version}</version>
+
+ <!--We want to be able to resuse the test-jars for mapred
+ to test avro-tool
+ see http://maven.apache.org/guides/mini/guide-attached-tests.html
+ -->
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
<configuration>
<archive>
<manifest>
Modified: avro/trunk/lang/java/tools/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/pom.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/pom.xml (original)
+++ avro/trunk/lang/java/tools/pom.xml Tue Oct 7 14:02:17 2014
@@ -117,6 +117,31 @@
<artifactId>avro-mapred</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!--For testing TetherTool we need the mapred test jar
+ because that contains the word count example.-->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>${commons-cli.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>${commons-logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>${commons-httpclient.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>trevni-core</artifactId>
Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java Tue Oct 7 14:02:17 2014
@@ -20,12 +20,9 @@ package org.apache.avro.tool;
import java.io.File;
import java.io.InputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.List;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.tether.TetherJob;
@@ -34,6 +31,15 @@ import org.apache.hadoop.mapred.FileInpu
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+
+
@SuppressWarnings("deprecation")
public class TetherTool implements Tool {
public TetherJob job;
@@ -52,38 +58,116 @@ public class TetherTool implements Tool
public int run(InputStream ins, PrintStream outs, PrintStream err,
List<String> args) throws Exception {
- OptionParser p = new OptionParser();
- OptionSpec<File> exec = p
- .accepts("program", "executable program, usually in HDFS")
- .withRequiredArg().ofType(File.class);
- OptionSpec<String> in = p.accepts("in", "comma-separated input paths")
- .withRequiredArg().ofType(String.class);
- OptionSpec<Path> out = p.accepts("out", "output directory")
- .withRequiredArg().ofType(Path.class);
- OptionSpec<File> outSchema = p.accepts("outschema", "output schema file")
- .withRequiredArg().ofType(File.class);
- OptionSpec<File> mapOutSchema = p
- .accepts("outschemamap", "map output schema file, if different")
- .withOptionalArg().ofType(File.class);
- OptionSpec<Integer> reduces = p.accepts("reduces", "number of reduces")
- .withOptionalArg().ofType(Integer.class);
+ String[] argarry = args.toArray(new String[0]);
+ Options opts = new Options();
+
+ Option helpopt = OptionBuilder.hasArg(false)
+ .withDescription("print this message")
+ .create("help");
+
+ Option inopt = OptionBuilder.hasArg()
+ .isRequired()
+ .withDescription("comma-separated input paths")
+ .create("in");
+
+ Option outopt = OptionBuilder.hasArg()
+ .isRequired()
+ .withDescription("The output path.")
+ .create("out");
+
+ Option pargs = OptionBuilder.hasArg()
+ .withDescription("A string containing the command line arguments to pass to the tethered process. String should be enclosed in quotes")
+ .create("exec_args");
+
+ Option popt = OptionBuilder.hasArg()
+ .isRequired()
+ .withDescription("executable program, usually in HDFS")
+ .create("program");
+
+ Option outscopt = OptionBuilder.withType(File.class).hasArg()
+ .isRequired()
+ .withDescription("schema file for output of reducer")
+ .create("outschema");
+
+ Option outscmapopt = OptionBuilder.withType(File.class).hasArg()
+ .withDescription("(optional) map output schema file, if different from outschema")
+ .create("outschemamap");
+
+ Option redopt = OptionBuilder.withType(Integer.class).hasArg()
+ .withDescription("(optional) number of reducers")
+ .create("reduces");
+
+ Option cacheopt = OptionBuilder.withType(Boolean.class).hasArg()
+ .withDescription("(optional) boolean indicating whether or not the exectuable should be distributed via distributed cache")
+ .create("exec_cached");
+
+ Option protoopt = OptionBuilder.hasArg()
+ .withDescription("(optional) specifies the transport protocol 'http' or 'sasl'")
+ .create("protocol");
+
+ opts.addOption(redopt);
+ opts.addOption(outscopt);
+ opts.addOption(popt);
+ opts.addOption(pargs);
+ opts.addOption(inopt);
+ opts.addOption(outopt);
+ opts.addOption(helpopt);
+ opts.addOption(outscmapopt);
+ opts.addOption(cacheopt);
+ opts.addOption(protoopt);
+
+ CommandLineParser parser = new GnuParser();
+
+ String[] genargs = null;
+ CommandLine line = null;
+ HelpFormatter formatter = new HelpFormatter();
JobConf job = new JobConf();
try {
- OptionSet opts = p.parse(args.toArray(new String[0]));
- FileInputFormat.addInputPaths(job, in.value(opts));
- FileOutputFormat.setOutputPath(job, out.value(opts));
- TetherJob.setExecutable(job, exec.value(opts));
- job.set(AvroJob.OUTPUT_SCHEMA, Schema.parse(outSchema.value(opts))
- .toString());
- if (opts.hasArgument(mapOutSchema))
+ line = parser.parse(opts, argarry);
+
+ if (line.hasOption("help")) {
+ formatter.printHelp("tether", opts );
+ return 0;
+ }
+
+ genargs = line.getArgs();
+
+ FileInputFormat.addInputPaths(job, line.getOptionValue("in"));
+ FileOutputFormat.setOutputPath(job,new Path (line.getOptionValue("out")));
+
+ List<String> exargs = null;
+ Boolean cached = false;
+
+ if (line.hasOption("exec_args")) {
+ String[] splitargs = line.getOptionValue("exec_args").split(" ");
+ exargs = new ArrayList<String>();
+ for (String item: splitargs){
+ exargs.add(item);
+ }
+ }
+ if (line.hasOption("exec_cached")) {
+ cached = Boolean.parseBoolean(line.getOptionValue("exec_cached"));
+ }
+ TetherJob.setExecutable(job, new File(line.getOptionValue("program")), exargs, cached);
+
+ File outschema = (File)line.getParsedOptionValue("outschema");
+ job.set(AvroJob.OUTPUT_SCHEMA, Schema.parse(outschema).toString());
+ if (line.hasOption("outschemamap")) {
job.set(AvroJob.MAP_OUTPUT_SCHEMA,
- Schema.parse(mapOutSchema.value(opts)).toString());
- if (opts.hasArgument(reduces))
- job.setNumReduceTasks(reduces.value(opts));
- } catch (Exception e) {
- p.printHelpOn(err);
+ Schema.parse((File)line.getParsedOptionValue("outschemamap")).toString());
+ }
+ if (line.hasOption("reduces")) {
+ job.setNumReduceTasks((Integer)line.getParsedOptionValue("reduces"));
+ }
+ if (line.hasOption("protocol")) {
+ TetherJob.setProtocol(job, line.getOptionValue("protocol"));
+ }
+ }
+ catch (Exception exp) {
+ System.out.println("Unexpected exception: " + exp.getMessage());
+ formatter.printHelp("tether", opts );
return -1;
}
Added: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java (added)
+++ avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java Tue Oct 7 14:02:17 2014
@@ -0,0 +1,109 @@
+package org.apache.avro.tool;
+
+import static org.junit.Assert.assertEquals;
+
+import static java.util.Arrays.asList;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.FileWriter;
+
+
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.Pair;
+import org.apache.avro.mapred.WordCountUtil;
+import org.apache.avro.mapred.tether.TetherJob;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+public class TestTetherTool {
+
+ /**
+ * Test that the tether tool works with the mapreduce example
+ *
+ * TODO: How can we ensure that when we run, the WordCountTether example has
+ * been properly compiled?
+ */
+ @Test
+ public void test() throws Exception {
+
+ // Create the schema files.
+ Schema outscheme = new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema();
+
+ // we need to write the schemas to a file
+ File midscfile = AvroTestUtil.tempFile(getClass(), "midschema.avpr");
+
+ FileWriter hf = null;
+ try {
+ hf =new FileWriter(midscfile);
+ hf.write(outscheme.toString());
+ }
+ finally {
+ if (hf != null) {
+ hf.close();
+ }
+ }
+
+ // Get the classpath to use as an argument.
+ String cp = System.getProperty("java.class.path");
+
+ JobConf job = new JobConf();
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath);
+
+ // create the input file
+ WordCountUtil.writeLinesFile();
+
+ // Executable is java? Argument will be WordCountTask.java - Is the classpath
+ // set appropriately automatically?
+ java.net.URI exec = new java.net.URI("java");
+ //input path
+ String in = dir + "/in";
+
+ // create a string of the arguments
+ String execargs = "-classpath " + System.getProperty("java.class.path");
+ execargs += " org.apache.avro.mapred.tether.WordCountTask";
+
+ // Create a list of the arguments to pass to the tull run method
+ java.util.List<String> runargs = new java.util.ArrayList<String> ();
+
+
+ runargs.addAll(java.util.Arrays.asList("--program", "java"));
+ runargs.addAll(asList("--exec_args", '"'+execargs+'"'));
+ runargs.addAll(asList("--exec_cached", "false"));
+ runargs.addAll(asList("--in", in));
+ runargs.addAll(asList("--out", outputPath.toString()));
+ runargs.addAll(asList("--outschema", midscfile.toString()));
+
+ TetherTool tool = new TetherTool();
+
+ tool.run(null, null, System.err, runargs);
+
+ // TODO:: We should probably do some validation
+ // validate the output
+ DatumReader<Pair<Utf8,Long>> reader = new SpecificDatumReader<Pair<Utf8,Long>>();
+ InputStream cin = new BufferedInputStream(new FileInputStream(WordCountUtil.COUNTS_FILE));
+ DataFileStream<Pair<Utf8,Long>> counts = new DataFileStream<Pair<Utf8,Long>>(cin,reader);
+ int numWords = 0;
+ for (Pair<Utf8,Long> wc : counts) {
+ assertEquals(wc.key().toString(),
+ WordCountUtil.COUNTS.get(wc.key().toString()), wc.value());
+ numWords++;
+ }
+ cin.close();
+ assertEquals(WordCountUtil.COUNTS.size(), numWords);
+ }
+}
Propchange: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTetherTool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: avro/trunk/lang/py/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Oct 7 14:02:17 2014
@@ -1,3 +1,4 @@
avro.egg-info
build
+userlogs
MANIFEST
Modified: avro/trunk/lang/py/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/build.xml?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/build.xml (original)
+++ avro/trunk/lang/py/build.xml Tue Oct 7 14:02:17 2014
@@ -77,6 +77,12 @@
<fileset dir="${lib.dir}" />
</copy>
+ <!--Copy the protocols used for tethering -->
+ <copy todir="${build.dir}/src/avro/tether">
+ <fileset dir="${share.schema.dir}/org/apache/avro/mapred/tether/">
+ <include name="*.avpr"/>
+ </fileset>
+ </copy>
<!-- Inline the handshake schemas -->
<copy file="${src.dir}/avro/ipc.py"
toFile="${build.dir}/src/avro/ipc.py"
@@ -120,6 +126,16 @@
<filter token="INTEROP_DATA_DIR" value="${interop.data.dir}"/>
</filterset>
</copy>
+
+ <!-- Inline the location of the tools jar -->
+ <copy file="${test.dir}/test_tether_word_count.py"
+ toFile="${build.dir}/test/test_tether_word_count.py"
+ overwrite="true">
+ <filterset>
+ <filter token="AVRO_VERSION" value="${avro.version}"/>
+ <filter token="TOPDIR" value="${basedir}"/>
+ </filterset>
+ </copy>
</target>
<target name="test"
@@ -135,6 +151,22 @@
</py-test>
</target>
+ <!--Created a unittest to run just the tests for tethered jobs.
+ -->
+ <target name="test-tether"
+ description="Run unit tests for a hadoop python-tethered job."
+ depends="build">
+ <taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask"
+ classpathref="java.classpath"/>
+ <py-test python="${python}" pythonpathref="test.path">
+ <fileset dir="${build.dir}/test">
+ <include name="test_tether*.py"/>
+ <!--<exclude name="test_datafile_interop.py"/>-->
+ </fileset>
+ </py-test>
+ </target>
+
+
<target name="interop-data-test"
description="Run python interop data tests"
depends="build">
Added: avro/trunk/lang/py/src/avro/tether/__init__.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/__init__.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/__init__.py (added)
+++ avro/trunk/lang/py/src/avro/tether/__init__.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,7 @@
+from .util import *
+from .tether_task import *
+from .tether_task_runner import *
+
+__all__=util.__all__
+__all__+=tether_task.__all__
+__all__+=tether_task_runner.__all__
Propchange: avro/trunk/lang/py/src/avro/tether/__init__.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/src/avro/tether/tether_task.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/tether_task.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/tether_task.py (added)
+++ avro/trunk/lang/py/src/avro/tether/tether_task.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,498 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["TetherTask","TaskType","inputProtocol","outputProtocol","HTTPRequestor"]
+
+from avro import schema, protocol
+from avro import io as avio
+from avro import ipc
+
+import io as pyio
+import sys
+import os
+import traceback
+import logging
+import collections
+from StringIO import StringIO
+import threading
+
+
+# create protocol objects for the input and output protocols
+# The build process should copy InputProtocol.avpr and OutputProtocol.avpr
+# into the same directory as this module
+inputProtocol=None
+outputProtocol=None
+
+TaskType=None
+if (inputProtocol is None):
+ pfile=os.path.split(__file__)[0]+os.sep+"InputProtocol.avpr"
+
+ if not(os.path.exists(pfile)):
+ raise Exception("Could not locate the InputProtocol: {0} does not exist".format(pfile))
+
+ with file(pfile,'r') as hf:
+ prototxt=hf.read()
+
+ inputProtocol=protocol.parse(prototxt)
+
+ # use a named tuple to represent the tasktype enumeration
+ taskschema=inputProtocol.types_dict["TaskType"]
+ _ttype=collections.namedtuple("_tasktype",taskschema.symbols)
+ TaskType=_ttype(*taskschema.symbols)
+
+if (outputProtocol is None):
+ pfile=os.path.split(__file__)[0]+os.sep+"OutputProtocol.avpr"
+
+ if not(os.path.exists(pfile)):
+ raise Exception("Could not locate the OutputProtocol: {0} does not exist".format(pfile))
+
+ with file(pfile,'r') as hf:
+ prototxt=hf.read()
+
+ outputProtocol=protocol.parse(prototxt)
+
+class Collector(object):
+ """
+ Collector for map and reduce output values
+ """
+ def __init__(self,scheme=None,outputClient=None):
+ """
+
+ Parameters
+ ---------------------------------------------
+ scheme - The scheme for the datums to output - can be a json string
+ - or an instance of Schema
+ outputClient - The output client used to send messages to the parent
+ """
+
+ if not(isinstance(scheme,schema.Schema)):
+ scheme=schema.parse(scheme)
+
+ if (outputClient is None):
+ raise ValueError("output client can't be none.")
+
+ self.scheme=scheme
+ self.buff=StringIO()
+ self.encoder=avio.BinaryEncoder(self.buff)
+
+ self.datum_writer = avio.DatumWriter(writers_schema=self.scheme)
+ self.outputClient=outputClient
+
+ def collect(self,record,partition=None):
+ """Collect a map or reduce output value
+
+ Parameters
+ ------------------------------------------------------
+ record - The record to write
+ partition - Indicates the partition for a pre-partitioned map output
+ - currently not supported
+ """
+
+ self.buff.truncate(0)
+ self.datum_writer.write(record, self.encoder);
+ self.buff.flush();
+ self.buff.seek(0)
+
+ # delete all the data in the buffer
+ if (partition is None):
+
+ # TODO: Is there a more efficient way to read the data in self.buff?
+ # we could use self.buff.read() but that returns the byte array as a string
+ # will that work? We can also use self.buff.readinto to read it into
+ # a bytearray but the byte array must be pre-allocated
+ # self.outputClient.output(self.buff.buffer.read())
+
+ #its not a StringIO
+ self.outputClient.request("output",{"datum":self.buff.read()})
+ else:
+ self.outputClient.request("outputPartitioned",{"datum":self.buff.read(),"partition":partition})
+
+
+
+def keys_are_equal(rec1,rec2,fkeys):
+ """Check if the "keys" in two records are equal. The key fields
+ are all fields for which order isn't marked ignore.
+
+ Parameters
+ -------------------------------------------------------------------------
+ rec1 - The first record
+ rec2 - The second record
+ fkeys - A list of the fields to compare
+ """
+
+ for f in fkeys:
+ if not(rec1[f]==rec2[f]):
+ return False
+
+ return True
+
+
+class HTTPRequestor(object):
+ """
+ This is a small requestor subclass I created for the HTTP protocol.
+ Since the HTTP protocol isn't persistent, we need to instantiate
+ a new transciever and new requestor for each request.
+ But I wanted to use of the requestor to be identical to that for
+ SocketTransciever so that we can seamlessly switch between the two.
+ """
+
+ def __init__(self, server,port,protocol):
+ """
+ Instantiate the class.
+
+ Parameters
+ ----------------------------------------------------------------------
+ server - The server hostname
+ port - Which port to use
+ protocol - The protocol for the communication
+ """
+
+ self.server=server
+ self.port=port
+ self.protocol=protocol
+
+ def request(self,*args,**param):
+ transciever=ipc.HTTPTransceiver(self.server,self.port)
+ requestor=ipc.Requestor(self.protocol, transciever)
+ return requestor.request(*args,**param)
+
+
+class TetherTask(object):
+ """
+ Base class for python tether mapreduce programs.
+
+ ToDo: Currently the subclass has to implement both reduce and reduceFlush.
+ This is not very pythonic. A pythonic way to implement the reducer
+ would be to pass the reducer a generator (as dumbo does) so that the user
+ could iterate over the records for the given key.
+ How would we do this. I think we would need to have two threads, one thread would run
+ the user's reduce function. This loop would be suspended when no reducer records were available.
+ The other thread would read in the records for the reducer. This thread should
+ only buffer so many records at a time (i.e if the buffer is full, self.input shouldn't return right
+ away but wait for space to free up)
+ """
+
+ def __init__(self,inschema=None,midschema=None,outschema=None):
+ """
+
+ Parameters
+ ---------------------------------------------------------
+ inschema - The scheme for the input to the mapper
+ midschema - The scheme for the output of the mapper
+ outschema - The scheme for the output of the reducer
+
+ An example scheme for the prototypical word count example would be
+ inscheme='{"type":"record", "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+ {"name":"key","type":"string"},
+ {"name":"value","type":"long","order":"ignore"}]
+ }'
+
+ Important: The records are split into (key,value) pairs as required by map reduce
+ by using all fields with "order"=ignore for the key and the remaining fields for the value.
+
+ The subclass provides these schemas in order to tell this class which schemas it expects.
+ The configure request will also provide the schemas that the parent process is using.
+ This allows us to check whether the schemas match and if not whether we can resolve
+ the differences (see http://avro.apache.org/docs/current/spec.html#Schema+Resolution))
+
+ """
+
+
+ if (inschema is None):
+ raise ValueError("inschema can't be None")
+
+ if (midschema is None):
+ raise ValueError("midschema can't be None")
+
+ if (outschema is None):
+ raise ValueError("outschema can't be None")
+
+ # make sure we can parse the schemas
+ # Should we call fail if we can't parse the schemas?
+ self.inschema=schema.parse(inschema)
+ self.midschema=schema.parse(midschema)
+ self.outschema=schema.parse(outschema)
+
+
+ # declare various variables
+ self.clienTransciever=None
+
+ # output client is used to communicate with the parent process
+ # in particular to transmit the outputs of the mapper and reducer
+ self.outputClient = None
+
+ # collectors for the output of the mapper and reducer
+ self.midCollector=None
+ self.outCollector=None
+
+ self._partitions=None
+
+ # cache a list of the fields used by the reducer as the keys
+ # we need the fields to decide when we have finished processing all values for
+ # a given key. We cache the fields to be more efficient
+ self._red_fkeys=None
+
+ # We need to keep track of the previous record fed to the reducer
+ # b\c we need to be able to determine when we start processing a new group
+ # in the reducer
+ self.midRecord=None
+
+ # create an event object to signal when
+ # http server is ready to be shutdown
+ self.ready_for_shutdown=threading.Event()
+ self.log=logging.getLogger("TetherTask")
+
+ def open(self, inputport,clientPort=None):
+ """Open the output client - i.e the connection to the parent process
+
+ Parameters
+ ---------------------------------------------------------------
+ inputport - This is the port that the subprocess is listening on. i.e the
+ subprocess starts a server listening on this port to accept requests from
+ the parent process
+ clientPort - The port on which the server in the parent process is listening
+ - If this is None we look for the environment variable AVRO_TETHER_OUTPUT_PORT
+ - This is mainly provided for debugging purposes. In practice
+ we want to use the environment variable
+
+ """
+
+
+ # Open the connection to the parent process
+ # The port the parent process is listening on is set in the environment
+ # variable AVRO_TETHER_OUTPUT_PORT
+ # open output client, connecting to parent
+
+ if (clientPort is None):
+ clientPortString = os.getenv("AVRO_TETHER_OUTPUT_PORT")
+ if (clientPortString is None):
+ raise Exception("AVRO_TETHER_OUTPUT_PORT env var is not set")
+
+ clientPort = int(clientPortString)
+
+ self.log.info("TetherTask.open: Opening connection to parent server on port={0}".format(clientPort))
+
+ # We use the HTTP protocol although we hope to shortly have
+ # support for SocketServer,
+ usehttp=True
+
+ if(usehttp):
+ # self.outputClient = ipc.Requestor(outputProtocol, self.clientTransceiver)
+ # since HTTP is stateless, a new transciever
+ # is created and closed for each request. We therefore set clientTransciever to None
+ # We still declare clientTransciever because for other (state) protocols we will need
+ # it and we want to check when we get the message fail whether the transciever
+ # needs to be closed.
+ # self.clientTranciever=None
+ self.outputClient = HTTPRequestor("127.0.0.1",clientPort,outputProtocol)
+
+ else:
+ raise NotImplementedError("Only http protocol is currently supported")
+
+ try:
+ self.outputClient.request('configure',{"port":inputport})
+ except Exception as e:
+ estr= traceback.format_exc()
+ self.fail(estr)
+
+
+ def configure(self,taskType, inSchemaText, outSchemaText):
+ """
+
+ Parameters
+ -------------------------------------------------------------------
+ taskType - What type of task (e.g map, reduce)
+ - This is an enumeration which is specified in the input protocol
+ inSchemaText - string containing the input schema
+ - This is the actual schema with which the data was encoded
+ i.e it is the writer_schema (see http://avro.apache.org/docs/current/spec.html#Schema+Resolution)
+ This is the schema the parent process is using which might be different
+ from the one provided by the subclass of tether_task
+
+ outSchemaText - string containing the output scheme
+ - This is the schema expected by the parent process for the output
+ """
+ self.taskType = taskType
+
+ try:
+ inSchema = schema.parse(inSchemaText)
+ outSchema = schema.parse(outSchemaText)
+
+ if (taskType==TaskType.MAP):
+ self.inReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.inschema)
+ self.midCollector=Collector(outSchemaText,self.outputClient)
+
+ elif(taskType==TaskType.REDUCE):
+ self.midReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.midschema)
+ # this.outCollector = new Collector<OUT>(outSchema);
+ self.outCollector=Collector(outSchemaText,self.outputClient)
+
+ # determine which fields in the input record are they keys for the reducer
+ self._red_fkeys=[f.name for f in self.midschema.fields if not(f.order=='ignore')]
+
+ except Exception as e:
+
+ estr= traceback.format_exc()
+ self.fail(estr)
+
+ def set_partitions(self,npartitions):
+
+ try:
+ self._partitions=npartitions
+ except Exception as e:
+ estr= traceback.format_exc()
+ self.fail(estr)
+
+ def get_partitions():
+ """ Return the number of map output partitions of this job."""
+ return self._partitions
+
+ def input(self,data,count):
+ """ Recieve input from the server
+
+ Parameters
+ ------------------------------------------------------
+ data - Sould containg the bytes encoding the serialized data
+ - I think this gets represented as a tring
+ count - how many input records are provided in the binary stream
+ """
+ try:
+ # to avio.BinaryDecoder
+ bdata=StringIO(data)
+ decoder = avio.BinaryDecoder(bdata)
+
+ for i in range(count):
+ if (self.taskType==TaskType.MAP):
+ inRecord = self.inReader.read(decoder)
+
+ # Do we need to pass midCollector if its declared as an instance variable
+ self.map(inRecord, self.midCollector)
+
+ elif (self.taskType==TaskType.REDUCE):
+
+ # store the previous record
+ prev = self.midRecord
+
+ # read the new record
+ self.midRecord = self.midReader.read(decoder);
+ if (prev != None and not(keys_are_equal(self.midRecord,prev,self._red_fkeys))):
+ # since the key has changed we need to finalize the processing
+ # for this group of key,value pairs
+ self.reduceFlush(prev, self.outCollector)
+ self.reduce(self.midRecord, self.outCollector)
+
+ except Exception as e:
+ estr= traceback.format_exc()
+ self.log.warning("failing: "+estr)
+ self.fail(estr)
+
+ def complete(self):
+ """
+ Process the complete request
+ """
+ if ((self.taskType == TaskType.REDUCE ) and not(self.midRecord is None)):
+ try:
+ self.reduceFlush(self.midRecord, self.outCollector);
+ except Exception as e:
+ estr=traceback.format_exc()
+ self.log.warning("failing: "+estr);
+ self.fail(estr)
+
+ self.outputClient.request("complete",dict())
+
+ def map(self,record,collector):
+ """Called with input values to generate intermediat values (i.e mapper output).
+
+ Parameters
+ ----------------------------------------------------------------------------
+ record - The input record
+ collector - The collector to collect the output
+
+ This is an abstract function which should be overloaded by the application specific
+ subclass.
+ """
+
+ raise NotImplementedError("This is an abstract method which should be overloaded in the subclass")
+
+ def reduce(self,record, collector):
+ """ Called with input values to generate reducer output. Inputs are sorted by the mapper
+ key.
+
+ The reduce function is invoked once for each value belonging to a given key outputted
+ by the mapper.
+
+ Parameters
+ ----------------------------------------------------------------------------
+ record - The mapper output
+ collector - The collector to collect the output
+
+ This is an abstract function which should be overloaded by the application specific
+ subclass.
+ """
+
+ raise NotImplementedError("This is an abstract method which should be overloaded in the subclass")
+
+ def reduceFlush(self,record, collector):
+ """
+ Called with the last intermediate value in each equivalence run.
+ In other words, reduceFlush is invoked once for each key produced in the reduce
+ phase. It is called after reduce has been invoked on each value for the given key.
+
+ Parameters
+ ------------------------------------------------------------------
+ record - the last record on which reduce was invoked.
+ """
+ raise NotImplementedError("This is an abstract method which should be overloaded in the subclass")
+
+ def status(self,message):
+ """
+ Called to update task status
+ """
+ self.outputClient.request("status",{"message":message})
+
+ def count(self,group, name, amount):
+ """
+ Called to increment a counter
+ """
+ self.outputClient.request("count",{"group":group, "name":name, "amount":amount})
+
+ def fail(self,message):
+ """
+ Call to fail the task.
+ """
+ self.log.error("TetherTask.fail: failure occured message follows:\n{0}".format(message))
+ try:
+ self.outputClient.request("fail",{"message":message})
+ except Exception as e:
+ estr=traceback.format_exc()
+ self.log.error("TetherTask.fail: an exception occured while trying to send the fail message to the output server:\n{0}".format(estr))
+
+ self.close()
+
+ def close(self):
+ self.log.info("TetherTask.close: closing")
+ if not(self.clienTransciever is None):
+ try:
+ self.clienTransciever.close()
+
+ except Exception as e:
+ # ignore exceptions
+ pass
+
+ # http server is ready to be shutdown
+ self.ready_for_shutdown.set()
Propchange: avro/trunk/lang/py/src/avro/tether/tether_task.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/src/avro/tether/tether_task_runner.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/tether_task_runner.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/tether_task_runner.py (added)
+++ avro/trunk/lang/py/src/avro/tether/tether_task_runner.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,227 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["TaskRunner"]
+
+if __name__ == "__main__":
+ # Relative imports don't work when being run directly
+ from avro import tether
+ from avro.tether import TetherTask, find_port, inputProtocol
+
+else:
+ from . import TetherTask, find_port, inputProtocol
+
+from avro import ipc
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import logging
+import weakref
+import threading
+import sys
+import traceback
+
+class TaskRunnerResponder(ipc.Responder):
+ """
+ The responder for the thethered process
+ """
+ def __init__(self,runner):
+ """
+ Param
+ ----------------------------------------------------------
+ runner - Instance of TaskRunner
+ """
+ ipc.Responder.__init__(self, inputProtocol)
+
+ self.log=logging.getLogger("TaskRunnerResponder")
+
+ # should we use weak references to avoid circular references?
+ # We use weak references b\c self.runner owns this instance of TaskRunnerResponder
+ if isinstance(runner,weakref.ProxyType):
+ self.runner=runner
+ else:
+ self.runner=weakref.proxy(runner)
+
+ self.task=weakref.proxy(runner.task)
+
+ def invoke(self, message, request):
+ try:
+ if message.name=='configure':
+ self.log.info("TetherTaskRunner: Recieved configure")
+ self.task.configure(request["taskType"],request["inSchema"],request["outSchema"])
+ elif message.name=='partitions':
+ self.log.info("TetherTaskRunner: Recieved partitions")
+ try:
+ self.task.set_partitions(request["partitions"])
+ except Exception as e:
+ self.log.error("Exception occured while processing the partitions message: Message:\n"+traceback.format_exc())
+ raise
+ elif message.name=='input':
+ self.log.info("TetherTaskRunner: Recieved input")
+ self.task.input(request["data"],request["count"])
+ elif message.name=='abort':
+ self.log.info("TetherTaskRunner: Recieved abort")
+ self.runner.close()
+ elif message.name=='complete':
+ self.log.info("TetherTaskRunner: Recieved complete")
+ self.task.complete()
+ self.task.close()
+ self.runner.close()
+ else:
+ self.log.warning("TetherTaskRunner: recieved unknown message {0}".format(message.name))
+
+ except Exception as e:
+ self.log.error("Error occured while processing message: {0}".format(message.name))
+ emsg=traceback.format_exc()
+ self.task.fail(emsg)
+
+ return None
+
+
+def HTTPHandlerGen(runner):
+ """
+ This is a class factory for the HTTPHandler. We need
+ a factory b\c we need a reference to the runner
+
+ Parameters
+ -----------------------------------------------------------------
+ runner - instance of the task runner
+ """
+
+ if not(isinstance(runner,weakref.ProxyType)):
+ runnerref=weakref.proxy(runner)
+ else:
+ runnerref=runner
+
+ class TaskRunnerHTTPHandler(BaseHTTPRequestHandler):
+ """Create a handler for the parent.
+ """
+
+ runner=runnerref
+ def __init__(self,*args,**param):
+ """
+ """
+ BaseHTTPRequestHandler.__init__(self,*args,**param)
+
+ def do_POST(self):
+ self.responder =TaskRunnerResponder(self.runner)
+ call_request_reader = ipc.FramedReader(self.rfile)
+ call_request = call_request_reader.read_framed_message()
+ resp_body = self.responder.respond(call_request)
+ self.send_response(200)
+ self.send_header('Content-Type', 'avro/binary')
+ self.end_headers()
+ resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer.write_framed_message(resp_body)
+
+ return TaskRunnerHTTPHandler
+
+class TaskRunner(object):
+ """This class ties together the server handling the requests from
+ the parent process and the instance of TetherTask which actually
+ implements the logic for the mapper and reducer phases
+ """
+
+ def __init__(self,task):
+ """
+ Construct the runner
+
+ Parameters
+ ---------------------------------------------------------------
+ task - An instance of tether task
+ """
+
+ self.log=logging.getLogger("TaskRunner:")
+
+ if not(isinstance(task,TetherTask)):
+ raise ValueError("task must be an instance of tether task")
+ self.task=task
+
+ self.server=None
+ self.sthread=None
+
+ def start(self,outputport=None,join=True):
+ """
+ Start the server
+
+ Parameters
+ -------------------------------------------------------------------
+ outputport - (optional) The port on which the parent process is listening
+ for requests from the task.
+ - This will typically be supplied by an environment variable
+ we allow it to be supplied as an argument mainly for debugging
+ join - (optional) If set to fault then we don't issue a join to block
+ until the thread excecuting the server terminates.
+ This is mainly for debugging. By setting it to false,
+ we can resume execution in this thread so that we can do additional
+ testing
+ """
+
+ port=find_port()
+ address=("localhost",port)
+
+
+ def thread_run(task_runner=None):
+ task_runner.server = HTTPServer(address, HTTPHandlerGen(task_runner))
+ task_runner.server.allow_reuse_address = True
+ task_runner.server.serve_forever()
+
+ # create a separate thread for the http server
+ sthread=threading.Thread(target=thread_run,kwargs={"task_runner":self})
+ sthread.start()
+
+ self.sthread=sthread
+ # This needs to run in a separat thread b\c serve_forever() blocks
+ self.task.open(port,clientPort=outputport)
+
+ # wait for the other thread to finish
+ if (join):
+ self.task.ready_for_shutdown.wait()
+ self.server.shutdown()
+
+ # should we do some kind of check to make sure it exits
+ self.log.info("Shutdown the logger")
+ # shutdown the logging
+ logging.shutdown()
+
+ def close(self):
+ """
+ Handler for the close message
+ """
+
+ self.task.close()
+
+if __name__ == '__main__':
+ # TODO::Make the logging level a parameter we can set
+ # logging.basicConfig(level=logging.INFO,filename='/tmp/log',filemode='w')
+ logging.basicConfig(level=logging.INFO)
+
+ if (len(sys.argv)<=1):
+ print "Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass"
+ raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass")
+
+ fullcls=sys.argv[1]
+ mod,cname=fullcls.rsplit(".",1)
+
+ logging.info("tether_task_runner.__main__: Task: {0}".format(fullcls))
+
+ modobj=__import__(mod,fromlist=cname)
+
+ taskcls=getattr(modobj,cname)
+ task=taskcls()
+
+ runner=TaskRunner(task=task)
+ runner.start()
Propchange: avro/trunk/lang/py/src/avro/tether/tether_task_runner.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/src/avro/tether/util.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/tether/util.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/src/avro/tether/util.py (added)
+++ avro/trunk/lang/py/src/avro/tether/util.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,34 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["find_port"]
+
+import socket
+
+
+def find_port():
+ """
+ Return an unbound port
+ """
+ s=socket.socket()
+ s.bind(("127.0.0.1",0))
+
+ port=s.getsockname()[1]
+ s.close()
+
+ return port
\ No newline at end of file
Propchange: avro/trunk/lang/py/src/avro/tether/util.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/test/mock_tether_parent.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/mock_tether_parent.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/mock_tether_parent.py (added)
+++ avro/trunk/lang/py/test/mock_tether_parent.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import set_avro_test_path
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from avro import ipc
+from avro import protocol
+from avro import tether
+
+import socket
+
+def find_port():
+ """
+ Return an unbound port
+ """
+ s=socket.socket()
+ s.bind(("127.0.0.1",0))
+
+ port=s.getsockname()[1]
+ s.close()
+
+ return port
+
+SERVER_ADDRESS = ('localhost', find_port())
+
+class MockParentResponder(ipc.Responder):
+ """
+ The responder for the mocked parent
+ """
+ def __init__(self):
+ ipc.Responder.__init__(self, tether.outputProtocol)
+
+ def invoke(self, message, request):
+ if message.name=='configure':
+ print "MockParentResponder: Recieved 'configure': inputPort={0}".format(request["port"])
+
+ elif message.name=='status':
+ print "MockParentResponder: Recieved 'status': message={0}".format(request["message"])
+ elif message.name=='fail':
+ print "MockParentResponder: Recieved 'fail': message={0}".format(request["message"])
+ else:
+ print "MockParentResponder: Recieved {0}".format(message.name)
+
+ # flush the output so it shows up in the parent process
+ sys.stdout.flush()
+
+ return None
+
+class MockParentHandler(BaseHTTPRequestHandler):
+ """Create a handler for the parent.
+ """
+ def do_POST(self):
+ self.responder =MockParentResponder()
+ call_request_reader = ipc.FramedReader(self.rfile)
+ call_request = call_request_reader.read_framed_message()
+ resp_body = self.responder.respond(call_request)
+ self.send_response(200)
+ self.send_header('Content-Type', 'avro/binary')
+ self.end_headers()
+ resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer.write_framed_message(resp_body)
+
+if __name__ == '__main__':
+ if (len(sys.argv)<=1):
+ raise ValueError("Usage: mock_tether_parent command")
+
+ cmd=sys.argv[1].lower()
+ if (sys.argv[1]=='start_server'):
+ if (len(sys.argv)==3):
+ port=int(sys.argv[2])
+ else:
+ raise ValueError("Usage: mock_tether_parent start_server port")
+
+ SERVER_ADDRESS=(SERVER_ADDRESS[0],port)
+ print "mock_tether_parent: Launching Server on Port: {0}".format(SERVER_ADDRESS[1])
+
+ # flush the output so it shows up in the parent process
+ sys.stdout.flush()
+ parent_server = HTTPServer(SERVER_ADDRESS, MockParentHandler)
+ parent_server.allow_reuse_address = True
+ parent_server.serve_forever()
Propchange: avro/trunk/lang/py/test/mock_tether_parent.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/test/set_avro_test_path.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/set_avro_test_path.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/set_avro_test_path.py (added)
+++ avro/trunk/lang/py/test/set_avro_test_path.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,25 @@
+"""
+Module adjusts the path PYTHONPATH so the unittests
+will work even if an egg for AVRO is already installed.
+By default eggs always appear higher on pythons path then
+directories set via the environment variable PYTHONPATH.
+
+For reference see:
+http://www.velocityreviews.com/forums/t716589-pythonpath-and-eggs.html
+http://stackoverflow.com/questions/897792/pythons-sys-path-value.
+
+Unittests would therefore use the installed AVRO and not the AVRO
+being built. To work around this the unittests import this module before
+importing AVRO. This module in turn adjusts the python path so that the test
+build of AVRO is higher on the path then any installed eggs.
+"""
+import sys
+import os
+
+# determine the build directory and then make sure all paths that start with the
+# build directory are at the top of the path
+builddir=os.path.split(os.path.split(__file__)[0])[0]
+bpaths=filter(lambda s:s.startswith(builddir), sys.path)
+
+for p in bpaths:
+ sys.path.insert(0,p)
\ No newline at end of file
Propchange: avro/trunk/lang/py/test/set_avro_test_path.py
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/py/test/test_datafile.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_datafile.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_datafile.py (original)
+++ avro/trunk/lang/py/test/test_datafile.py Tue Oct 7 14:02:17 2014
@@ -15,6 +15,9 @@
# limitations under the License.
import os
import unittest
+
+import set_avro_test_path
+
from avro import schema
from avro import io
from avro import datafile
Modified: avro/trunk/lang/py/test/test_datafile_interop.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_datafile_interop.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_datafile_interop.py (original)
+++ avro/trunk/lang/py/test/test_datafile_interop.py Tue Oct 7 14:02:17 2014
@@ -15,6 +15,9 @@
# limitations under the License.
import os
import unittest
+
+import set_avro_test_path
+
from avro import io
from avro import datafile
Modified: avro/trunk/lang/py/test/test_io.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_io.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_io.py (original)
+++ avro/trunk/lang/py/test/test_io.py Tue Oct 7 14:02:17 2014
@@ -19,6 +19,9 @@ try:
except ImportError:
from StringIO import StringIO
from binascii import hexlify
+
+import set_avro_test_path
+
from avro import schema
from avro import io
Modified: avro/trunk/lang/py/test/test_ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_ipc.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_ipc.py (original)
+++ avro/trunk/lang/py/test/test_ipc.py Tue Oct 7 14:02:17 2014
@@ -19,6 +19,8 @@ servers yet available.
"""
import unittest
+import set_avro_test_path
+
# This test does import this code, to make sure it at least passes
# compilation.
from avro import ipc
Modified: avro/trunk/lang/py/test/test_schema.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_schema.py?rev=1629897&r1=1629896&r2=1629897&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_schema.py (original)
+++ avro/trunk/lang/py/test/test_schema.py Tue Oct 7 14:02:17 2014
@@ -17,6 +17,8 @@
Test the schema parsing logic.
"""
import unittest
+import set_avro_test_path
+
from avro import schema
def print_test_name(test_name):
Added: avro/trunk/lang/py/test/test_tether_task.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_tether_task.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_tether_task.py (added)
+++ avro/trunk/lang/py/test/test_tether_task.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+import os
+import subprocess
+import sys
+import time
+import unittest
+
+import set_avro_test_path
+
+class TestTetherTask(unittest.TestCase):
+ """
+ TODO: We should validate the the server response by looking at stdout
+ """
+ def test1(self):
+ """
+ Test that the thether_task is working. We run the mock_tether_parent in a separate
+ subprocess
+ """
+ from avro import tether
+ from avro import io as avio
+ from avro import schema
+ from avro.tether import HTTPRequestor,inputProtocol, find_port
+
+ import StringIO
+ import mock_tether_parent
+ from word_count_task import WordCountTask
+
+ task=WordCountTask()
+
+ proc=None
+ try:
+ # launch the server in a separate process
+ # env["AVRO_TETHER_OUTPUT_PORT"]=output_port
+ env=dict()
+ env["PYTHONPATH"]=':'.join(sys.path)
+ server_port=find_port()
+
+ pyfile=mock_tether_parent.__file__
+ proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(server_port)])
+ input_port=find_port()
+
+ print "Mock server started process pid={0}".format(proc.pid)
+ # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
+ # so we give the subprocess time to start up
+ time.sleep(1)
+ task.open(input_port,clientPort=server_port)
+
+ # TODO: We should validate that open worked by grabbing the STDOUT of the subproces
+ # and ensuring that it outputted the correct message.
+
+ #***************************************************************
+ # Test the mapper
+ task.configure(tether.TaskType.MAP,str(task.inschema),str(task.midschema))
+
+ # Serialize some data so we can send it to the input function
+ datum="This is a line of text"
+ writer = StringIO.StringIO()
+ encoder = avio.BinaryEncoder(writer)
+ datum_writer = avio.DatumWriter(task.inschema)
+ datum_writer.write(datum, encoder)
+
+ writer.seek(0)
+ data=writer.read()
+
+ # Call input to simulate calling map
+ task.input(data,1)
+
+ # Test the reducer
+ task.configure(tether.TaskType.REDUCE,str(task.midschema),str(task.outschema))
+
+ # Serialize some data so we can send it to the input function
+ datum={"key":"word","value":2}
+ writer = StringIO.StringIO()
+ encoder = avio.BinaryEncoder(writer)
+ datum_writer = avio.DatumWriter(task.midschema)
+ datum_writer.write(datum, encoder)
+
+ writer.seek(0)
+ data=writer.read()
+
+ # Call input to simulate calling reduce
+ task.input(data,1)
+
+ task.complete()
+
+ # try a status
+ task.status("Status message")
+
+ except Exception as e:
+ raise
+ finally:
+ # close the process
+ if not(proc is None):
+ proc.kill()
+
+ pass
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
Propchange: avro/trunk/lang/py/test/test_tether_task.py
------------------------------------------------------------------------------
svn:eol-style = native