You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/15 12:19:01 UTC

[02/10] incubator-ariatosca git commit: ARIA-296 Process termination test fails on windows

ARIA-296 Process termination test fails on windows


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a3059fe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a3059fe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a3059fe3

Branch: refs/heads/rest_client_work
Commit: a3059fe3ff50a473eb23171b305f4cbe272bcf77
Parents: f32845f
Author: max-orlov <ma...@gigaspaces.com>
Authored: Thu Jun 29 10:55:54 2017 +0300
Committer: Ran Ziv <ra...@gigaspaces.com>
Committed: Mon Jul 10 16:59:53 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  8 ++--
 .../workflows/executor/test_process_executor.py | 47 ++++++++++++++------
 2 files changed, 38 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a3059fe3/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 69288ea..81da26f 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -26,7 +26,6 @@ import sys
 # entry point. We thus remove this module's directory from the python path if it happens to be
 # there
 
-import signal
 from collections import namedtuple
 
 script_dir = os.path.dirname(__file__)
@@ -121,7 +120,8 @@ class ProcessExecutor(base.BaseExecutor):
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
-        for task_id in self._tasks:
+        # we use set(self._tasks) since tasks may change in the process of closing
+        for task_id in set(self._tasks):
             self.terminate(task_id)
 
     def terminate(self, task_id):
@@ -132,10 +132,10 @@ class ProcessExecutor(base.BaseExecutor):
                 parent_process = psutil.Process(task.proc.pid)
                 for child_process in reversed(parent_process.children(recursive=True)):
                     try:
-                        child_process.send_signal(signal.SIGKILL)
+                        child_process.kill()
                     except BaseException:
                         pass
-                parent_process.send_signal(signal.SIGKILL)
+                parent_process.kill()
             except BaseException:
                 pass
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a3059fe3/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index d3e7a6d..f8fc567 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import os
+import sys
 import time
 import Queue
 import subprocess
@@ -23,6 +24,7 @@ import psutil
 import retrying
 
 import aria
+
 from aria import operation
 from aria.modeling import models
 from aria.orchestrator import events
@@ -78,15 +80,27 @@ class TestProcessExecutor(object):
             executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
         assert 'closed' in exc_info.value.message
 
-    @pytest.mark.skipif(os.name == 'nt', reason='uses bash script')
-    def test_process_termination(self, executor, model, fs_test_holder):
-        argument = models.Argument.wrap('holder_path', fs_test_holder._path)
-        model.argument.put(argument)
+    def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
+        freeze_script_path = str(tmpdir.join('freeze_script'))
+        with open(freeze_script_path, 'w+b') as f:
+            f.write(
+                '''import time
+while True:
+    time.sleep(5)
+                '''
+            )
+        holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+        script_path_argument = models.Argument.wrap('freezing_script_path',
+                                                    str(tmpdir.join('freeze_script')))
+
+        model.argument.put(holder_path_argument)
+        model.argument.put(script_path_argument)
         ctx = MockContext(
             model,
             task_kwargs=dict(
                 function='{0}.{1}'.format(__name__, freezing_task.__name__),
-                arguments=dict(holder_path=argument)),
+                arguments=dict(holder_path=holder_path_argument,
+                               freezing_script_path=script_path_argument)),
         )
 
         executor.execute(ctx)
@@ -95,16 +109,23 @@ class TestProcessExecutor(object):
         def wait_for_extra_process_id():
             return fs_test_holder.get('subproc', False)
 
-        pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()]
-        assert any(p.pid == pid for p in psutil.process_iter() for pid in pids)
+        task_pid = executor._tasks[ctx.task.id].proc.pid
+        extra_process_pid = wait_for_extra_process_id()
+
+        assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
         executor.terminate(ctx.task.id)
 
         # Give a chance to the processes to terminate
-        time.sleep(10) # windows might require more time
-        assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
-                       for p in psutil.process_iter()
-                       for pid in pids)
+        time.sleep(2)
 
+        # all processes should be either zombies or non existent
+        pids = [task_pid, extra_process_pid]
+        for pid in pids:
+            if pid in psutil.pids():
+                assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
+            else:
+                # making the test more readable
+                assert pid not in psutil.pids()
 
 @pytest.fixture
 def fs_test_holder(tmpdir):
@@ -138,8 +159,8 @@ def model(tmpdir):
 
 
 @operation
-def freezing_task(holder_path, **_):
+def freezing_task(holder_path, freezing_script_path, **_):
     holder = FilesystemDataHolder(holder_path)
-    holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid
+    holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
     while True:
         time.sleep(5)