You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/07/16 12:53:00 UTC
[avro] branch master updated: AVRO-2889: Fix Test Threading Race
Condition (#932)
This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new c903aa6 AVRO-2889: Fix Test Threading Race Condition (#932)
c903aa6 is described below
commit c903aa6d6fc42d3c347f95d469a8364ea44165e8
Author: Michael A. Smith <mi...@smith-li.com>
AuthorDate: Thu Jul 16 08:52:51 2020 -0400
AVRO-2889: Fix Test Threading Race Condition (#932)
---
lang/py/avro/test/test_tether_task_runner.py | 6 ++++++
lang/py/avro/tether/tether_task_runner.py | 14 ++++++--------
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/lang/py/avro/test/test_tether_task_runner.py b/lang/py/avro/test/test_tether_task_runner.py
index 10582d3..90e01cb 100644
--- a/lang/py/avro/test/test_tether_task_runner.py
+++ b/lang/py/avro/test/test_tether_task_runner.py
@@ -66,6 +66,12 @@ class TestTetherTaskRunner(unittest.TestCase):
runner = avro.tether.tether_task_runner.TaskRunner(avro.test.word_count_task.WordCountTask())
runner.start(outputport=parent_port, join=False)
+ for _ in range(12):
+ if runner.server is not None:
+ break
+ time.sleep(1)
+ else:
+ raise RuntimeError("Server never started")
# Test sending various messages to the server and ensuring they are processed correctly
requestor = avro.tether.tether_task.HTTPRequestor(
diff --git a/lang/py/avro/tether/tether_task_runner.py b/lang/py/avro/tether/tether_task_runner.py
index 602625c..2e00c61 100644
--- a/lang/py/avro/tether/tether_task_runner.py
+++ b/lang/py/avro/tether/tether_task_runner.py
@@ -141,6 +141,9 @@ class TaskRunner(object):
implements the logic for the mapper and reducer phases
"""
+ server = None
+ sthread = None
+
def __init__(self, task):
"""
Construct the runner
@@ -149,15 +152,11 @@ class TaskRunner(object):
---------------------------------------------------------------
task - An instance of tether task
"""
-
self.log = logging.getLogger("TaskRunner:")
-
- if not(isinstance(task, avro.tether.tether_task.TetherTask)):
- raise ValueError("task must be an instance of tether task")
self.task = task
- self.server = None
- self.sthread = None
+ if not isinstance(task, avro.tether.tether_task.TetherTask):
+ raise ValueError("task must be an instance of tether task")
def start(self, outputport=None, join=True):
"""
@@ -175,7 +174,6 @@ class TaskRunner(object):
we can resume execution in this thread so that we can do additional
testing
"""
-
port = avro.tether.util.find_port()
address = ("localhost", port)
@@ -189,7 +187,7 @@ class TaskRunner(object):
sthread.start()
self.sthread = sthread
- # This needs to run in a separat thread b\c serve_forever() blocks
+ # This needs to run in a separate thread because serve_forever() blocks.
self.task.open(port, clientPort=outputport)
# wait for the other thread to finish