You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/07/16 12:53:00 UTC

[avro] branch master updated: AVRO-2889: Fix Test Threading Race Condition (#932)

This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new c903aa6  AVRO-2889: Fix Test Threading Race Condition (#932)
c903aa6 is described below

commit c903aa6d6fc42d3c347f95d469a8364ea44165e8
Author: Michael A. Smith <mi...@smith-li.com>
AuthorDate: Thu Jul 16 08:52:51 2020 -0400

    AVRO-2889: Fix Test Threading Race Condition (#932)
---
 lang/py/avro/test/test_tether_task_runner.py |  6 ++++++
 lang/py/avro/tether/tether_task_runner.py    | 14 ++++++--------
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/lang/py/avro/test/test_tether_task_runner.py b/lang/py/avro/test/test_tether_task_runner.py
index 10582d3..90e01cb 100644
--- a/lang/py/avro/test/test_tether_task_runner.py
+++ b/lang/py/avro/test/test_tether_task_runner.py
@@ -66,6 +66,12 @@ class TestTetherTaskRunner(unittest.TestCase):
             runner = avro.tether.tether_task_runner.TaskRunner(avro.test.word_count_task.WordCountTask())
 
             runner.start(outputport=parent_port, join=False)
+            for _ in range(12):
+                if runner.server is not None:
+                    break
+                time.sleep(1)
+            else:
+                raise RuntimeError("Server never started")
 
             # Test sending various messages to the server and ensuring they are processed correctly
             requestor = avro.tether.tether_task.HTTPRequestor(
diff --git a/lang/py/avro/tether/tether_task_runner.py b/lang/py/avro/tether/tether_task_runner.py
index 602625c..2e00c61 100644
--- a/lang/py/avro/tether/tether_task_runner.py
+++ b/lang/py/avro/tether/tether_task_runner.py
@@ -141,6 +141,9 @@ class TaskRunner(object):
     implements the logic for the mapper and reducer phases
     """
 
+    server = None
+    sthread = None
+
     def __init__(self, task):
         """
         Construct the runner
@@ -149,15 +152,11 @@ class TaskRunner(object):
         ---------------------------------------------------------------
         task - An instance of tether task
         """
-
         self.log = logging.getLogger("TaskRunner:")
-
-        if not(isinstance(task, avro.tether.tether_task.TetherTask)):
-            raise ValueError("task must be an instance of tether task")
         self.task = task
 
-        self.server = None
-        self.sthread = None
+        if not isinstance(task, avro.tether.tether_task.TetherTask):
+            raise ValueError("task must be an instance of tether task")
 
     def start(self, outputport=None, join=True):
         """
@@ -175,7 +174,6 @@ class TaskRunner(object):
                     we can resume execution in this thread so that we can do additional
                     testing
         """
-
         port = avro.tether.util.find_port()
         address = ("localhost", port)
 
@@ -189,7 +187,7 @@ class TaskRunner(object):
         sthread.start()
 
         self.sthread = sthread
-        # This needs to run in a separat thread b\c serve_forever() blocks
+        # This needs to run in a separate thread because serve_forever() blocks.
         self.task.open(port, clientPort=outputport)
 
         # wait for the other thread to finish