You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/10/07 16:02:18 UTC
svn commit: r1629897 [2/2] - in /avro/trunk: ./ lang/java/ lang/java/mapred/
lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/
lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/
lang/java/tools/ lang/java/tools/src/main/java/org/...
Added: avro/trunk/lang/py/test/test_tether_task_runner.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_tether_task_runner.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_tether_task_runner.py (added)
+++ avro/trunk/lang/py/test/test_tether_task_runner.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import sys
+import time
+import unittest
+
+import set_avro_test_path
+
+
+class TestTetherTaskRunner(unittest.TestCase):
+ """ unit test for a tethered task runner.
+ """
+
+ def test1(self):
+ from word_count_task import WordCountTask
+ from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType
+ from avro import io as avio
+ import mock_tether_parent
+ import subprocess
+ import StringIO
+ import logging
+
+ # set the logging level to debug so that debug messages are printed
+ logging.basicConfig(level=logging.DEBUG)
+
+ proc=None
+ try:
+ # launch the server in a separate process
+ env=dict()
+ env["PYTHONPATH"]=':'.join(sys.path)
+ parent_port=find_port()
+
+ pyfile=mock_tether_parent.__file__
+ proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
+ input_port=find_port()
+
+ print "Mock server started process pid={0}".format(proc.pid)
+ # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
+ # so we give the subprocess time to start up
+ time.sleep(1)
+
+ runner=TaskRunner(WordCountTask())
+
+ runner.start(outputport=parent_port,join=False)
+
+ # Test sending various messages to the server and ensuring they are
+ # processed correctly
+ requestor=HTTPRequestor("localhost",runner.server.server_address[1],inputProtocol)
+
+ # TODO: We should validate that open worked by grabbing the STDOUT of the subproces
+ # and ensuring that it outputted the correct message.
+
+ # Test the mapper
+ requestor.request("configure",{"taskType":TaskType.MAP,"inSchema":str(runner.task.inschema),"outSchema":str(runner.task.midschema)})
+
+ # Serialize some data so we can send it to the input function
+ datum="This is a line of text"
+ writer = StringIO.StringIO()
+ encoder = avio.BinaryEncoder(writer)
+ datum_writer = avio.DatumWriter(runner.task.inschema)
+ datum_writer.write(datum, encoder)
+
+ writer.seek(0)
+ data=writer.read()
+
+
+ # Call input to simulate calling map
+ requestor.request("input",{"data":data,"count":1})
+
+ #Test the reducer
+ requestor.request("configure",{"taskType":TaskType.REDUCE,"inSchema":str(runner.task.midschema),"outSchema":str(runner.task.outschema)})
+
+ #Serialize some data so we can send it to the input function
+ datum={"key":"word","value":2}
+ writer = StringIO.StringIO()
+ encoder = avio.BinaryEncoder(writer)
+ datum_writer = avio.DatumWriter(runner.task.midschema)
+ datum_writer.write(datum, encoder)
+
+ writer.seek(0)
+ data=writer.read()
+
+
+ #Call input to simulate calling reduce
+ requestor.request("input",{"data":data,"count":1})
+
+ requestor.request("complete",{})
+
+
+ runner.task.ready_for_shutdown.wait()
+ runner.server.shutdown()
+ #time.sleep(2)
+ #runner.server.shutdown()
+
+ sthread=runner.sthread
+
+ #Possible race condition?
+ time.sleep(1)
+
+ #make sure the other thread terminated
+ self.assertFalse(sthread.isAlive())
+
+ #shutdown the logging
+ logging.shutdown()
+
+ except Exception as e:
+ raise
+ finally:
+ #close the process
+ if not(proc is None):
+ proc.kill()
+
+
+ def test2(self):
+ """
+ In this test we want to make sure that when we run "tether_task_runner.py"
+ as our main script everything works as expected. We do this by using subprocess to run it
+ in a separate thread.
+ """
+ from word_count_task import WordCountTask
+ from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType
+ from avro.tether import tether_task_runner
+ from avro import io as avio
+ import mock_tether_parent
+ import subprocess
+ import StringIO
+
+
+ proc=None
+
+ runnerproc=None
+ try:
+ #launch the server in a separate process
+ env=dict()
+ env["PYTHONPATH"]=':'.join(sys.path)
+ parent_port=find_port()
+
+ pyfile=mock_tether_parent.__file__
+ proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
+
+ #Possible race condition? when we start tether_task_runner it will call
+ # open tries to connect to the subprocess before the subprocess is fully started
+ #so we give the subprocess time to start up
+ time.sleep(1)
+
+
+ #start the tether_task_runner in a separate process
+ env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}
+ env["PYTHONPATH"]=':'.join(sys.path)
+
+ runnerproc=subprocess.Popen(["python",tether_task_runner.__file__,"word_count_task.WordCountTask"],env=env)
+
+ #possible race condition wait for the process to start
+ time.sleep(1)
+
+
+
+ print "Mock server started process pid={0}".format(proc.pid)
+ #Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
+ #so we give the subprocess time to start up
+ time.sleep(1)
+
+
+ except Exception as e:
+ raise
+ finally:
+ #close the process
+ if not(runnerproc is None):
+ runnerproc.kill()
+
+ if not(proc is None):
+ proc.kill()
+
+if __name__==("__main__"):
+ unittest.main()
Propchange: avro/trunk/lang/py/test/test_tether_task_runner.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/test/test_tether_word_count.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_tether_word_count.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_tether_word_count.py (added)
+++ avro/trunk/lang/py/test/test_tether_word_count.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import inspect
+import subprocess
+import sys
+import time
+import unittest
+import os
+
+import set_avro_test_path
+
+class TestTetherWordCount(unittest.TestCase):
+ """ unittest for a python tethered map-reduce job.
+ """
+
+ def _write_lines(self,lines,fname):
+ """
+ Write the lines to an avro file named fname
+
+ Parameters
+ --------------------------------------------------------
+ lines - list of strings to write
+ fname - the name of the file to write to.
+ """
+ import avro.io as avio
+ from avro.datafile import DataFileReader,DataFileWriter
+ from avro import schema
+
+ #recursively make all directories
+ dparts=fname.split(os.sep)[:-1]
+ for i in range(len(dparts)):
+ pdir=os.sep+os.sep.join(dparts[:i+1])
+ if not(os.path.exists(pdir)):
+ os.mkdir(pdir)
+
+
+ with file(fname,'w') as hf:
+ inschema="""{"type":"string"}"""
+ writer=DataFileWriter(hf,avio.DatumWriter(inschema),writers_schema=schema.parse(inschema))
+
+ #encoder = avio.BinaryEncoder(writer)
+ #datum_writer = avio.DatumWriter()
+ for datum in lines:
+ writer.append(datum)
+
+ writer.close()
+
+
+
+
+ def _count_words(self,lines):
+ """Return a dictionary counting the words in lines
+ """
+ counts={}
+
+ for line in lines:
+ words=line.split()
+
+ for w in words:
+ if not(counts.has_key(w.strip())):
+ counts[w.strip()]=0
+
+ counts[w.strip()]=counts[w.strip()]+1
+
+ return counts
+
+ def test1(self):
+ """
+ Run a tethered map-reduce job.
+
+ Assumptions: 1) bash is available in /bin/bash
+ """
+ from word_count_task import WordCountTask
+ from avro.tether import tether_task_runner
+ from avro.datafile import DataFileReader
+ from avro.io import DatumReader
+ import avro
+
+ import subprocess
+ import StringIO
+ import shutil
+ import tempfile
+ import inspect
+
+ proc=None
+
+ try:
+
+
+ # TODO we use the tempfile module to generate random names
+ # for the files
+ base_dir = "/tmp/test_tether_word_count"
+ if os.path.exists(base_dir):
+ shutil.rmtree(base_dir)
+
+ inpath = os.path.join(base_dir, "in")
+ infile=os.path.join(inpath, "lines.avro")
+ lines=["the quick brown fox jumps over the lazy dog",
+ "the cow jumps over the moon",
+ "the rain in spain falls mainly on the plains"]
+
+ self._write_lines(lines,infile)
+
+ true_counts=self._count_words(lines)
+
+ if not(os.path.exists(infile)):
+ self.fail("Missing the input file {0}".format(infile))
+
+
+ # The schema for the output of the mapper and reducer
+ oschema="""
+{"type":"record",
+ "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+ {"name":"key","type":"string"},
+ {"name":"value","type":"long","order":"ignore"}
+ ]
+}
+"""
+
+ # write the schema to a temporary file
+ osfile=tempfile.NamedTemporaryFile(mode='w',suffix=".avsc",prefix="wordcount",delete=False)
+ outschema=osfile.name
+ osfile.write(oschema)
+ osfile.close()
+
+ if not(os.path.exists(outschema)):
+ self.fail("Missing the schema file")
+
+ outpath = os.path.join(base_dir, "out")
+
+ args=[]
+
+ args.append("java")
+ args.append("-jar")
+ args.append(os.path.abspath("@TOPDIR@/../java/tools/target/avro-tools-@AVRO_VERSION@.jar"))
+
+
+ args.append("tether")
+ args.extend(["--in",inpath])
+ args.extend(["--out",outpath])
+ args.extend(["--outschema",outschema])
+ args.extend(["--protocol","http"])
+
+ # form the arguments for the subprocess
+ subargs=[]
+
+ srcfile=inspect.getsourcefile(tether_task_runner)
+
+ # Create a shell script to act as the program we want to execute
+ # We do this so we can set the python path appropriately
+ script="""#!/bin/bash
+export PYTHONPATH={0}
+python -m avro.tether.tether_task_runner word_count_task.WordCountTask
+"""
+ # We need to make sure avro is on the path
+ # getsourcefile(avro) returns .../avro/__init__.py
+ asrc=inspect.getsourcefile(avro)
+ apath=asrc.rsplit(os.sep,2)[0]
+
+ # path to where the tests lie
+ tpath=os.path.split(__file__)[0]
+
+ exhf=tempfile.NamedTemporaryFile(mode='w',prefix="exec_word_count_",delete=False)
+ exfile=exhf.name
+ exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
+ exhf.close()
+
+ # make it world executable
+ os.chmod(exfile,0755)
+
+ args.extend(["--program",exfile])
+
+ print "Command:\n\t{0}".format(" ".join(args))
+ proc=subprocess.Popen(args)
+
+
+ proc.wait()
+
+ # read the output
+ with file(os.path.join(outpath,"part-00000.avro")) as hf:
+ reader=DataFileReader(hf, DatumReader())
+ for record in reader:
+ self.assertEqual(record["value"],true_counts[record["key"]])
+
+ reader.close()
+
+ except Exception as e:
+ raise
+ finally:
+ # close the process
+ if proc is not None and proc.returncode is None:
+ proc.kill()
+ if os.path.exists(base_dir):
+ shutil.rmtree(base_dir)
+ if os.path.exists(exfile):
+ os.remove(exfile)
+
+if __name__== "__main__":
+ unittest.main()
Propchange: avro/trunk/lang/py/test/test_tether_word_count.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py/test/word_count_task.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/word_count_task.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/word_count_task.py (added)
+++ avro/trunk/lang/py/test/word_count_task.py Tue Oct 7 14:02:17 2014
@@ -0,0 +1,96 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["WordCountTask"]
+
+from avro.tether import TetherTask
+
+import logging
+
+#TODO::Make the logging level a parameter we can set
+#logging.basicConfig(level=logging.INFO)
+class WordCountTask(TetherTask):
+ """
+ Implements the mappper and reducer for the word count example
+ """
+
+ def __init__(self):
+ """
+ """
+
+ inschema="""{"type":"string"}"""
+ midschema="""{"type":"record", "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+ {"name":"key","type":"string"},
+ {"name":"value","type":"long","order":"ignore"}]
+ }"""
+ outschema=midschema
+ TetherTask.__init__(self,inschema,midschema,outschema)
+
+
+ #keep track of the partial sums of the counts
+ self.psum=0
+
+
+ def map(self,record,collector):
+ """Implement the mapper for the word count example
+
+ Parameters
+ ----------------------------------------------------------------------------
+ record - The input record
+ collector - The collector to collect the output
+ """
+
+ words=record.split()
+
+ for w in words:
+ logging.info("WordCountTask.Map: word={0}".format(w))
+ collector.collect({"key":w,"value":1})
+
+ def reduce(self,record, collector):
+ """Called with input values to generate reducer output. Inputs are sorted by the mapper
+ key.
+
+ The reduce function is invoked once for each value belonging to a given key outputted
+ by the mapper.
+
+ Parameters
+ ----------------------------------------------------------------------------
+ record - The mapper output
+ collector - The collector to collect the output
+ """
+
+ self.psum+=record["value"]
+
+ def reduceFlush(self,record, collector):
+ """
+ Called with the last intermediate value in each equivalence run.
+ In other words, reduceFlush is invoked once for each key produced in the reduce
+ phase. It is called after reduce has been invoked on each value for the given key.
+
+ Parameters
+ ------------------------------------------------------------------
+ record - the last record on which reduce was invoked.
+ """
+
+ #collect the current record
+ logging.info("WordCountTask.reduceFlush key={0} value={1}".format(record["key"],self.psum))
+
+ collector.collect({"key":record["key"],"value":self.psum})
+
+ #reset the sum
+ self.psum=0
Propchange: avro/trunk/lang/py/test/word_count_task.py
------------------------------------------------------------------------------
svn:eol-style = native