You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ko...@apache.org on 2019/11/09 00:37:37 UTC
[avro] branch master updated: AVRO-2603: Setup and Teardown (#698)
This is an automated email from the ASF dual-hosted git repository.
kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new df663c5 AVRO-2603: Setup and Teardown (#698)
df663c5 is described below
commit df663c511b640f912b673f55c0c8b679996b6c79
Author: Michael A. Smith <mi...@smith-li.com>
AuthorDate: Fri Nov 8 19:37:25 2019 -0500
AVRO-2603: Setup and Teardown (#698)
---
lang/py/test/test_tether_word_count.py | 161 +++++++++++++++------------------
1 file changed, 75 insertions(+), 86 deletions(-)
diff --git a/lang/py/test/test_tether_word_count.py b/lang/py/test/test_tether_word_count.py
index 8cf7b8f..1e0dada 100644
--- a/lang/py/test/test_tether_word_count.py
+++ b/lang/py/test/test_tether_word_count.py
@@ -34,6 +34,13 @@ import avro.schema
import avro.tether.tether_task_runner
import set_avro_test_path
+_TOP_DIR = """@TOPDIR@"""
+_AVRO_VERSION = """@AVRO_VERSION@"""
+_JAR_PATH = os.path.abspath(os.path.join(_TOP_DIR, "..", "java", "tools", "target", "avro-tools-{}.jar".format(_AVRO_VERSION)))
+
+_LINES = ("the quick brown fox jumps over the lazy dog",
+ "the cow jumps over the moon",
+ "the rain in spain falls mainly on the plains")
_IN_SCHEMA = '"string"'
# The schema for the output of the mapper and reducer
@@ -45,11 +52,60 @@ _OUT_SCHEMA = """{
{"name": "value", "type": "long", "order": "ignore"}]
}"""
+# Create a shell script to act as the program we want to execute
+# We do this so we can set the python path appropriately
+_PYTHON_PATH = os.pathsep.join([os.path.dirname(os.path.dirname(avro.__file__)),
+ os.path.dirname(__file__)])
+_SCRIPT = """#!/bin/sh
+PYTHONPATH={} python -m avro.tether.tether_task_runner word_count_task.WordCountTask
+""".format(_PYTHON_PATH)
+
class TestTetherWordCount(unittest.TestCase):
"""unittest for a python tethered map-reduce job."""
- def _write_lines(self,lines,fname):
+ _base_dir = None
+ _script_path = None
+ _input_path = None
+ _output_path = None
+ _output_schema_path = None
+
+ def setUp(self):
+ """Create temporary files for testing."""
+ prefix, _ = os.path.splitext(os.path.basename(__file__))
+ self._base_dir = tempfile.mkdtemp(prefix=prefix)
+
+ # We create the input path...
+ self._input_path = os.path.join(self._base_dir, "in")
+ if not os.path.exists(self._input_path):
+ os.makedirs(self._input_path)
+ infile = os.path.join(self._input_path, "lines.avro")
+ self._write_lines(_LINES, infile)
+ self.assertTrue(os.path.exists(infile), "Missing the input file {}".format(infile))
+
+ # ...and the output schema...
+ self._output_schema_path = os.path.join(self._base_dir, "output.avsc")
+ with open(self._output_schema_path, 'wb') as output_schema_handle:
+ output_schema_handle.write(_OUT_SCHEMA)
+ self.assertTrue(os.path.exists(self._output_schema_path), "Missing the schema file")
+
+ # ...and the script...
+ self._script_path = os.path.join(self._base_dir, "exec_word_count.sh")
+ with open(self._script_path, 'wb') as script_handle:
+ script_handle.write(_SCRIPT)
+ os.chmod(self._script_path, 0o755)
+
+ # ...but we just name the output path. The tether tool creates it.
+ self._output_path = os.path.join(self._base_dir, "out")
+
+ def tearDown(self):
+ """Remove temporary files used in testing."""
+ if os.path.exists(self._base_dir):
+ shutil.rmtree(self._base_dir)
+ if self._script_path is not None and os.path.exists(self._script_path):
+ os.remove(self._script_path)
+
+ def _write_lines(self, lines, fname):
"""
Write the lines to an avro file named fname
@@ -65,91 +121,24 @@ class TestTetherWordCount(unittest.TestCase):
writer.append(datum)
def test_tether_word_count(self):
- """
- Run a tethered map-reduce job.
-
- Assumptions: 1) bash is available in /bin/bash
- """
- exfile = None
- try:
- # TODO we use the tempfile module to generate random names
- # for the files
- base_dir = "/tmp/test_tether_word_count"
- if os.path.exists(base_dir):
- shutil.rmtree(base_dir)
-
- inpath = os.path.join(base_dir, "in")
- infile=os.path.join(inpath, "lines.avro")
- lines=["the quick brown fox jumps over the lazy dog",
- "the cow jumps over the moon",
- "the rain in spain falls mainly on the plains"]
-
- if not os.path.exists(inpath):
- os.makedirs(inpath)
- self._write_lines(lines,infile)
-
- self.assertTrue(os.path.exists(infile), "Missing the input file {}".format(infile))
-
- # write the schema to a temporary file
- with tempfile.NamedTemporaryFile(mode='wb',
- suffix=".avsc",
- prefix="wordcount",
- delete=False) as osfile:
- osfile.write(_OUT_SCHEMA)
- outschema = osfile.name
-
- self.assertTrue(os.path.exists(outschema), "Missing the schema file")
-
- outpath = os.path.join(base_dir, "out")
-
- srcfile = avro.tether.tether_task_runner.__file__
-
- # Create a shell script to act as the program we want to execute
- # We do this so we can set the python path appropriately
- script="""#!/bin/bash
-export PYTHONPATH={0}
-python -m avro.tether.tether_task_runner word_count_task.WordCountTask
-"""
- # We need to make sure avro is on the path
- # getsourcefile(avro) returns .../avro/__init__.py
- asrc = avro.__file__
- apath=asrc.rsplit(os.sep,2)[0]
-
- # path to where the tests lie
- tpath=os.path.split(__file__)[0]
-
- with tempfile.NamedTemporaryFile(mode='wb',
- prefix="exec_word_count_",
- delete=False) as exhf:
- exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
- exfile=exhf.name
-
- # make it world executable
- os.chmod(exfile,0o755)
-
- jar_path = os.path.abspath("@TOPDIR@/../java/tools/target/avro-tools-@AVRO_VERSION@.jar")
- args = ("java", "-jar", jar_path, "tether",
- "--in", inpath,
- "--out", outpath,
- "--outschema", outschema,
- "--protocol", "http",
- "--program", exfile)
-
- print("Command:\n\t{0}".format(" ".join(args)))
- subprocess.check_call(args)
-
- # read the output
- datum_reader = avro.io.DatumReader()
- outfile = os.path.join(outpath, "part-00000.avro")
- expected_counts = collections.Counter(' '.join(lines).split())
- with avro.datafile.DataFileReader(open(outfile, 'rb'), datum_reader) as reader:
- actual_counts = {r["key"]: r["value"] for r in reader}
- self.assertDictEqual(actual_counts, expected_counts)
- finally:
- if os.path.exists(base_dir):
- shutil.rmtree(base_dir)
- if exfile is not None and os.path.exists(exfile):
- os.remove(exfile)
+ """Check that a tethered map-reduce job produces the output expected locally."""
+ # Run the job...
+ args = ("java", "-jar", _JAR_PATH, "tether",
+ "--protocol", "http",
+ "--in", self._input_path,
+ "--out", self._output_path,
+ "--outschema", self._output_schema_path,
+ "--program", self._script_path)
+ print("Command:\n\t{0}".format(" ".join(args)))
+ subprocess.check_call(args)
+
+ # ...and test the results.
+ datum_reader = avro.io.DatumReader()
+ outfile = os.path.join(self._output_path, "part-00000.avro")
+ expected_counts = collections.Counter(' '.join(_LINES).split())
+ with avro.datafile.DataFileReader(open(outfile, 'rb'), datum_reader) as reader:
+ actual_counts = {r["key"]: r["value"] for r in reader}
+ self.assertDictEqual(actual_counts, expected_counts)
if __name__== "__main__":
unittest.main()