You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/10/07 16:02:18 UTC

svn commit: r1629897 [2/2] - in /avro/trunk: ./ lang/java/ lang/java/mapred/ lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/ lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/ lang/java/tools/ lang/java/tools/src/main/java/org/...

Added: avro/trunk/lang/py/test/test_tether_task_runner.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_tether_task_runner.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_tether_task_runner.py (added)
+++ avro/trunk/lang/py/test/test_tether_task_runner.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import sys
+import time
+import unittest
+
+import set_avro_test_path
+
+
+class TestTetherTaskRunner(unittest.TestCase):
+  """ unit test for a tethered task runner.
+  """
+
+  def test1(self):
+    from word_count_task import WordCountTask
+    from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType
+    from avro import io as avio
+    import mock_tether_parent
+    import subprocess
+    import StringIO
+    import logging
+
+    # set the logging level to debug so that debug messages are printed
+    logging.basicConfig(level=logging.DEBUG)
+
+    proc=None
+    try:
+      # launch the server in a separate process
+      env=dict()
+      env["PYTHONPATH"]=':'.join(sys.path)
+      parent_port=find_port()
+
+      pyfile=mock_tether_parent.__file__
+      proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
+      input_port=find_port()
+
+      print "Mock server started process pid={0}".format(proc.pid)
+      # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
+      # so we give the subprocess time to start up
+      time.sleep(1)
+
+      runner=TaskRunner(WordCountTask())
+
+      runner.start(outputport=parent_port,join=False)
+
+      # Test sending various messages to the server and ensuring they are
+      # processed correctly
+      requestor=HTTPRequestor("localhost",runner.server.server_address[1],inputProtocol)
+
+      # TODO: We should validate that open worked by grabbing the STDOUT of the subproces
+      # and ensuring that it outputted the correct message.
+
+      # Test the mapper
+      requestor.request("configure",{"taskType":TaskType.MAP,"inSchema":str(runner.task.inschema),"outSchema":str(runner.task.midschema)})
+
+      # Serialize some data so we can send it to the input function
+      datum="This is a line of text"
+      writer = StringIO.StringIO()
+      encoder = avio.BinaryEncoder(writer)
+      datum_writer = avio.DatumWriter(runner.task.inschema)
+      datum_writer.write(datum, encoder)
+
+      writer.seek(0)
+      data=writer.read()
+
+
+      # Call input to simulate calling map
+      requestor.request("input",{"data":data,"count":1})
+
+      #Test the reducer
+      requestor.request("configure",{"taskType":TaskType.REDUCE,"inSchema":str(runner.task.midschema),"outSchema":str(runner.task.outschema)})
+
+      #Serialize some data so we can send it to the input function
+      datum={"key":"word","value":2}
+      writer = StringIO.StringIO()
+      encoder = avio.BinaryEncoder(writer)
+      datum_writer = avio.DatumWriter(runner.task.midschema)
+      datum_writer.write(datum, encoder)
+
+      writer.seek(0)
+      data=writer.read()
+
+
+      #Call input to simulate calling reduce
+      requestor.request("input",{"data":data,"count":1})
+
+      requestor.request("complete",{})
+
+
+      runner.task.ready_for_shutdown.wait()
+      runner.server.shutdown()
+      #time.sleep(2)
+      #runner.server.shutdown()
+
+      sthread=runner.sthread
+
+      #Possible race condition?
+      time.sleep(1)
+
+      #make sure the other thread terminated
+      self.assertFalse(sthread.isAlive())
+
+      #shutdown the logging
+      logging.shutdown()
+
+    except Exception as e:
+      raise
+    finally:
+      #close the process
+      if not(proc is None):
+        proc.kill()
+
+
+  def test2(self):
+    """
+    In this test we want to make sure that when we run "tether_task_runner.py"
+    as our main script everything works as expected. We do this by using subprocess to run it
+    in a separate thread.
+    """
+    from word_count_task import WordCountTask
+    from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType
+    from avro.tether import tether_task_runner
+    from avro import io as avio
+    import mock_tether_parent
+    import subprocess
+    import StringIO
+
+
+    proc=None
+
+    runnerproc=None
+    try:
+      #launch the server in a separate process
+      env=dict()
+      env["PYTHONPATH"]=':'.join(sys.path)
+      parent_port=find_port()
+
+      pyfile=mock_tether_parent.__file__
+      proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
+
+      #Possible race condition? when we start tether_task_runner it will call
+      # open tries to connect to the subprocess before the subprocess is fully started
+      #so we give the subprocess time to start up
+      time.sleep(1)
+
+
+      #start the tether_task_runner in a separate process
+      env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}
+      env["PYTHONPATH"]=':'.join(sys.path)
+
+      runnerproc=subprocess.Popen(["python",tether_task_runner.__file__,"word_count_task.WordCountTask"],env=env)
+
+      #possible race condition wait for the process to start
+      time.sleep(1)
+
+
+
+      print "Mock server started process pid={0}".format(proc.pid)
+      #Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
+      #so we give the subprocess time to start up
+      time.sleep(1)
+
+
+    except Exception as e:
+      raise
+    finally:
+      #close the process
+      if not(runnerproc is None):
+        runnerproc.kill()
+
+      if not(proc is None):
+        proc.kill()
+
+if __name__==("__main__"):
+  unittest.main()

Propchange: avro/trunk/lang/py/test/test_tether_task_runner.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/test/test_tether_word_count.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_tether_word_count.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_tether_word_count.py (added)
+++ avro/trunk/lang/py/test/test_tether_word_count.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import inspect
+import subprocess
+import sys
+import time
+import unittest
+import os
+
+import set_avro_test_path
+
+class TestTetherWordCount(unittest.TestCase):
+  """ unittest for a python tethered map-reduce job.
+  """
+
+  def _write_lines(self,lines,fname):
+    """
+    Write the lines to an avro file named fname
+
+    Parameters
+    --------------------------------------------------------
+    lines - list of strings to write
+    fname - the name of the file to write to.
+    """
+    import avro.io as avio
+    from avro.datafile import DataFileReader,DataFileWriter
+    from avro import schema
+
+    #recursively make all directories
+    dparts=fname.split(os.sep)[:-1]
+    for i in range(len(dparts)):
+      pdir=os.sep+os.sep.join(dparts[:i+1])
+      if not(os.path.exists(pdir)):
+        os.mkdir(pdir)
+
+
+    with file(fname,'w') as hf:
+      inschema="""{"type":"string"}"""
+      writer=DataFileWriter(hf,avio.DatumWriter(inschema),writers_schema=schema.parse(inschema))
+
+      #encoder = avio.BinaryEncoder(writer)
+      #datum_writer = avio.DatumWriter()
+      for datum in lines:
+        writer.append(datum)
+
+      writer.close()
+
+
+
+
+  def _count_words(self,lines):
+    """Return a dictionary counting the words in lines
+    """
+    counts={}
+
+    for line in lines:
+      words=line.split()
+
+      for w in words:
+        if not(counts.has_key(w.strip())):
+          counts[w.strip()]=0
+
+        counts[w.strip()]=counts[w.strip()]+1
+
+    return counts
+
+  def test1(self):
+    """
+    Run a tethered map-reduce job.
+
+    Assumptions: 1) bash is available in /bin/bash
+    """
+    from word_count_task import WordCountTask
+    from avro.tether import tether_task_runner
+    from avro.datafile import DataFileReader
+    from avro.io import DatumReader
+    import avro
+
+    import subprocess
+    import StringIO
+    import shutil
+    import tempfile
+    import inspect
+
+    proc=None
+
+    try:
+
+
+      # TODO we use the tempfile module to generate random names
+      # for the files
+      base_dir = "/tmp/test_tether_word_count"
+      if os.path.exists(base_dir):
+        shutil.rmtree(base_dir)
+
+      inpath = os.path.join(base_dir, "in")
+      infile=os.path.join(inpath, "lines.avro")
+      lines=["the quick brown fox jumps over the lazy dog",
+             "the cow jumps over the moon",
+             "the rain in spain falls mainly on the plains"]
+
+      self._write_lines(lines,infile)
+
+      true_counts=self._count_words(lines)
+
+      if not(os.path.exists(infile)):
+        self.fail("Missing the input file {0}".format(infile))
+
+
+      # The schema for the output of the mapper and reducer
+      oschema="""
+{"type":"record",
+ "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+     {"name":"key","type":"string"},
+     {"name":"value","type":"long","order":"ignore"}
+ ]
+}
+"""
+
+      # write the schema to a temporary file
+      osfile=tempfile.NamedTemporaryFile(mode='w',suffix=".avsc",prefix="wordcount",delete=False)
+      outschema=osfile.name
+      osfile.write(oschema)
+      osfile.close()
+
+      if not(os.path.exists(outschema)):
+        self.fail("Missing the schema file")
+
+      outpath = os.path.join(base_dir, "out")
+
+      args=[]
+
+      args.append("java")
+      args.append("-jar")
+      args.append(os.path.abspath("@TOPDIR@/../java/tools/target/avro-tools-@AVRO_VERSION@.jar"))
+
+
+      args.append("tether")
+      args.extend(["--in",inpath])
+      args.extend(["--out",outpath])
+      args.extend(["--outschema",outschema])
+      args.extend(["--protocol","http"])
+
+      # form the arguments for the subprocess
+      subargs=[]
+
+      srcfile=inspect.getsourcefile(tether_task_runner)
+
+      # Create a shell script to act as the program we want to execute
+      # We do this so we can set the python path appropriately
+      script="""#!/bin/bash
+export PYTHONPATH={0}
+python -m avro.tether.tether_task_runner word_count_task.WordCountTask
+"""
+      # We need to make sure avro is on the path
+      # getsourcefile(avro) returns .../avro/__init__.py
+      asrc=inspect.getsourcefile(avro)
+      apath=asrc.rsplit(os.sep,2)[0]
+
+      # path to where the tests lie
+      tpath=os.path.split(__file__)[0]
+
+      exhf=tempfile.NamedTemporaryFile(mode='w',prefix="exec_word_count_",delete=False)
+      exfile=exhf.name
+      exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
+      exhf.close()
+
+      # make it world executable
+      os.chmod(exfile,0755)
+
+      args.extend(["--program",exfile])
+
+      print "Command:\n\t{0}".format(" ".join(args))
+      proc=subprocess.Popen(args)
+
+
+      proc.wait()
+
+      # read the output
+      with file(os.path.join(outpath,"part-00000.avro")) as hf:
+        reader=DataFileReader(hf, DatumReader())
+        for record in reader:
+          self.assertEqual(record["value"],true_counts[record["key"]])
+
+        reader.close()
+
+    except Exception as e:
+      raise
+    finally:
+      # close the process
+      if proc is not None and proc.returncode is None:
+        proc.kill()
+      if os.path.exists(base_dir):
+        shutil.rmtree(base_dir)
+      if os.path.exists(exfile):
+        os.remove(exfile)
+
+if __name__== "__main__":
+  unittest.main()

Propchange: avro/trunk/lang/py/test/test_tether_word_count.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py/test/word_count_task.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/word_count_task.py?rev=1629897&view=auto
==============================================================================
--- avro/trunk/lang/py/test/word_count_task.py (added)
+++ avro/trunk/lang/py/test/word_count_task.py Tue Oct  7 14:02:17 2014
@@ -0,0 +1,96 @@
+"""
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+"""
+
+__all__=["WordCountTask"]
+
+from avro.tether import TetherTask
+
+import logging
+
+#TODO::Make the logging level a parameter we can set
+#logging.basicConfig(level=logging.INFO)
+class WordCountTask(TetherTask):
+  """
+  Implements the mappper and reducer for the word count example
+  """
+
+  def __init__(self):
+    """
+    """
+
+    inschema="""{"type":"string"}"""
+    midschema="""{"type":"record", "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
+              {"name":"key","type":"string"},
+              {"name":"value","type":"long","order":"ignore"}]
+              }"""
+    outschema=midschema
+    TetherTask.__init__(self,inschema,midschema,outschema)
+
+
+    #keep track of the partial sums of the counts
+    self.psum=0
+
+
+  def map(self,record,collector):
+    """Implement the mapper for the word count example
+
+    Parameters
+    ----------------------------------------------------------------------------
+    record - The input record
+    collector - The collector to collect the output
+    """
+
+    words=record.split()
+
+    for w in words:
+      logging.info("WordCountTask.Map: word={0}".format(w))
+      collector.collect({"key":w,"value":1})
+
+  def reduce(self,record, collector):
+    """Called with input values to generate reducer output. Inputs are sorted by the mapper
+    key.
+
+    The reduce function is invoked once for each value belonging to a given key outputted
+    by the mapper.
+
+    Parameters
+    ----------------------------------------------------------------------------
+    record - The mapper output
+    collector - The collector to collect the output
+    """
+
+    self.psum+=record["value"]
+
+  def reduceFlush(self,record, collector):
+    """
+    Called with the last intermediate value in each equivalence run.
+    In other words, reduceFlush is invoked once for each key produced in the reduce
+    phase. It is called after reduce has been invoked on each value for the given key.
+
+    Parameters
+    ------------------------------------------------------------------
+    record - the last record on which reduce was invoked.
+    """
+
+    #collect the current record
+    logging.info("WordCountTask.reduceFlush key={0} value={1}".format(record["key"],self.psum))
+
+    collector.collect({"key":record["key"],"value":self.psum})
+
+    #reset the sum
+    self.psum=0

Propchange: avro/trunk/lang/py/test/word_count_task.py
------------------------------------------------------------------------------
    svn:eol-style = native