You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ko...@apache.org on 2019/11/09 00:37:37 UTC

[avro] branch master updated: AVRO-2603: Setup and Teardown (#698)

This is an automated email from the ASF dual-hosted git repository.

kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new df663c5  AVRO-2603: Setup and Teardown (#698)
df663c5 is described below

commit df663c511b640f912b673f55c0c8b679996b6c79
Author: Michael A. Smith <mi...@smith-li.com>
AuthorDate: Fri Nov 8 19:37:25 2019 -0500

    AVRO-2603: Setup and Teardown (#698)
---
 lang/py/test/test_tether_word_count.py | 161 +++++++++++++++------------------
 1 file changed, 75 insertions(+), 86 deletions(-)

diff --git a/lang/py/test/test_tether_word_count.py b/lang/py/test/test_tether_word_count.py
index 8cf7b8f..1e0dada 100644
--- a/lang/py/test/test_tether_word_count.py
+++ b/lang/py/test/test_tether_word_count.py
@@ -34,6 +34,13 @@ import avro.schema
 import avro.tether.tether_task_runner
 import set_avro_test_path
 
+_TOP_DIR = """@TOPDIR@"""
+_AVRO_VERSION = """@AVRO_VERSION@"""
+_JAR_PATH = os.path.abspath(os.path.join(_TOP_DIR, "..", "java", "tools", "target", "avro-tools-{}.jar".format(_AVRO_VERSION)))
+
+_LINES = ("the quick brown fox jumps over the lazy dog",
+          "the cow jumps over the moon",
+          "the rain in spain falls mainly on the plains")
 _IN_SCHEMA = '"string"'
 
 # The schema for the output of the mapper and reducer
@@ -45,11 +52,60 @@ _OUT_SCHEMA = """{
              {"name": "value", "type": "long", "order": "ignore"}]
 }"""
 
+# Create a shell script to act as the program we want to execute
+# We do this so we can set the python path appropriately
+_PYTHON_PATH = os.pathsep.join([os.path.dirname(os.path.dirname(avro.__file__)),
+                                os.path.dirname(__file__)])
+_SCRIPT = """#!/bin/sh
+PYTHONPATH={} python -m avro.tether.tether_task_runner word_count_task.WordCountTask
+""".format(_PYTHON_PATH)
+
 
 class TestTetherWordCount(unittest.TestCase):
   """unittest for a python tethered map-reduce job."""
 
-  def _write_lines(self,lines,fname):
+  _base_dir = None
+  _script_path = None
+  _input_path = None
+  _output_path = None
+  _output_schema_path = None
+
+  def setUp(self):
+    """Create temporary files for testing."""
+    prefix, _ = os.path.splitext(os.path.basename(__file__))
+    self._base_dir = tempfile.mkdtemp(prefix=prefix)
+
+    # We create the input path...
+    self._input_path = os.path.join(self._base_dir, "in")
+    if not os.path.exists(self._input_path):
+      os.makedirs(self._input_path)
+    infile = os.path.join(self._input_path, "lines.avro")
+    self._write_lines(_LINES, infile)
+    self.assertTrue(os.path.exists(infile), "Missing the input file {}".format(infile))
+
+    # ...and the output schema...
+    self._output_schema_path = os.path.join(self._base_dir, "output.avsc")
+    with open(self._output_schema_path, 'wb') as output_schema_handle:
+      output_schema_handle.write(_OUT_SCHEMA)
+    self.assertTrue(os.path.exists(self._output_schema_path), "Missing the schema file")
+
+    # ...and the script...
+    self._script_path = os.path.join(self._base_dir, "exec_word_count.sh")
+    with open(self._script_path, 'wb') as script_handle:
+      script_handle.write(_SCRIPT)
+    os.chmod(self._script_path, 0o755)
+
+    # ...but we just name the output path. The tether tool creates it.
+    self._output_path = os.path.join(self._base_dir, "out")
+
+  def tearDown(self):
+    """Remove temporary files used in testing."""
+    if os.path.exists(self._base_dir):
+      shutil.rmtree(self._base_dir)
+    if self._script_path is not None and os.path.exists(self._script_path):
+      os.remove(self._script_path)
+
+  def _write_lines(self, lines, fname):
     """
     Write the lines to an avro file named fname
 
@@ -65,91 +121,24 @@ class TestTetherWordCount(unittest.TestCase):
         writer.append(datum)
 
   def test_tether_word_count(self):
-    """
-    Run a tethered map-reduce job.
-
-    Assumptions: 1) bash is available in /bin/bash
-    """
-    exfile = None
-    try:
-      # TODO we use the tempfile module to generate random names
-      # for the files
-      base_dir = "/tmp/test_tether_word_count"
-      if os.path.exists(base_dir):
-        shutil.rmtree(base_dir)
-
-      inpath = os.path.join(base_dir, "in")
-      infile=os.path.join(inpath, "lines.avro")
-      lines=["the quick brown fox jumps over the lazy dog",
-             "the cow jumps over the moon",
-             "the rain in spain falls mainly on the plains"]
-
-      if not os.path.exists(inpath):
-        os.makedirs(inpath)
-      self._write_lines(lines,infile)
-
-      self.assertTrue(os.path.exists(infile), "Missing the input file {}".format(infile))
-
-      # write the schema to a temporary file
-      with tempfile.NamedTemporaryFile(mode='wb',
-                                       suffix=".avsc",
-                                       prefix="wordcount",
-                                       delete=False) as osfile:
-        osfile.write(_OUT_SCHEMA)
-      outschema = osfile.name
-
-      self.assertTrue(os.path.exists(outschema), "Missing the schema file")
-
-      outpath = os.path.join(base_dir, "out")
-
-      srcfile = avro.tether.tether_task_runner.__file__
-
-      # Create a shell script to act as the program we want to execute
-      # We do this so we can set the python path appropriately
-      script="""#!/bin/bash
-export PYTHONPATH={0}
-python -m avro.tether.tether_task_runner word_count_task.WordCountTask
-"""
-      # We need to make sure avro is on the path
-      # getsourcefile(avro) returns .../avro/__init__.py
-      asrc = avro.__file__
-      apath=asrc.rsplit(os.sep,2)[0]
-
-      # path to where the tests lie
-      tpath=os.path.split(__file__)[0]
-
-      with tempfile.NamedTemporaryFile(mode='wb',
-                                       prefix="exec_word_count_",
-                                       delete=False) as exhf:
-        exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
-      exfile=exhf.name
-
-      # make it world executable
-      os.chmod(exfile,0o755)
-
-      jar_path = os.path.abspath("@TOPDIR@/../java/tools/target/avro-tools-@AVRO_VERSION@.jar")
-      args = ("java", "-jar", jar_path, "tether",
-              "--in", inpath,
-              "--out", outpath,
-              "--outschema", outschema,
-              "--protocol", "http",
-              "--program", exfile)
-
-      print("Command:\n\t{0}".format(" ".join(args)))
-      subprocess.check_call(args)
-
-      # read the output
-      datum_reader = avro.io.DatumReader()
-      outfile = os.path.join(outpath, "part-00000.avro")
-      expected_counts = collections.Counter(' '.join(lines).split())
-      with avro.datafile.DataFileReader(open(outfile, 'rb'), datum_reader) as reader:
-        actual_counts = {r["key"]: r["value"] for r in reader}
-      self.assertDictEqual(actual_counts, expected_counts)
-    finally:
-      if os.path.exists(base_dir):
-        shutil.rmtree(base_dir)
-      if exfile is not None and os.path.exists(exfile):
-        os.remove(exfile)
+    """Check that a tethered map-reduce job produces the output expected locally."""
+    # Run the job...
+    args = ("java", "-jar", _JAR_PATH, "tether",
+            "--protocol", "http",
+            "--in", self._input_path,
+            "--out", self._output_path,
+            "--outschema", self._output_schema_path,
+            "--program", self._script_path)
+    print("Command:\n\t{0}".format(" ".join(args)))
+    subprocess.check_call(args)
+
+    # ...and test the results.
+    datum_reader = avro.io.DatumReader()
+    outfile = os.path.join(self._output_path, "part-00000.avro")
+    expected_counts = collections.Counter(' '.join(_LINES).split())
+    with avro.datafile.DataFileReader(open(outfile, 'rb'), datum_reader) as reader:
+      actual_counts = {r["key"]: r["value"] for r in reader}
+    self.assertDictEqual(actual_counts, expected_counts)
 
 if __name__== "__main__":
   unittest.main()