You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/15 22:01:48 UTC

[airflow] 01/01: Further validation that only task commands are run by executors (#9240)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7d332c583df359a250428f19e48a2d889787c17a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 12 01:17:43 2020 +0100

    Further validation that only task commands are run by executors (#9240)
    
    (cherry-picked from 99c534e9faf)
---
 airflow/executors/kubernetes_executor.py    |  7 ++-----
 tests/executors/test_dask_executor.py       |  6 +++---
 tests/executors/test_kubernetes_executor.py |  3 ++-
 tests/executors/test_local_executor.py      | 20 +++++++++++++++-----
 4 files changed, 22 insertions(+), 14 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index b62462f..8b5fdc1 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -395,11 +395,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
         key, command, kube_executor_config = next_job
         dag_id, task_id, execution_date, try_number = key
 
-        if isinstance(command, str):
-            command = [command]
-
-        if command[0] != "airflow":
-            raise ValueError('The first element of command must be equal to "airflow".')
+        if command[0:2] != ["airflow", "run"]:
+            raise ValueError('The command must start with ["airflow", "run"].')
 
         config_pod = self.worker_configuration.make_pod(
             namespace=self.namespace,
diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py
index f43a54f..2bf9d8e 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -50,12 +50,12 @@ pytestmark = pytest.mark.xfail(condition=True, reason="The Dask executor is expe
 class BaseDaskTest(unittest.TestCase):
 
     def assert_tasks_on_executor(self, executor):
+
+        success_command = ['airflow', 'run', '--help']
+        fail_command = ['airflow', 'run', 'false']
         # start the executor
         executor.start()
 
-        success_command = ['true', 'some_parameter']
-        fail_command = ['false', 'some_parameter']
-
         executor.execute_async(key='success', command=success_command)
         executor.execute_async(key='fail', command=fail_command)
 
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9073493..77299f6 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -155,7 +155,8 @@ class TestKubernetesExecutor(unittest.TestCase):
         # Execute a task while the Api Throws errors
         try_number = 1
         kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
-                                         command='command', executor_config={})
+                                         command=['airflow', 'run', 'true', 'some_parameter'],
+                                         executor_config={})
         kubernetesExecutor.sync()
         kubernetesExecutor.sync()
 
diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py
index 3aebbe2..67f0b28 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import subprocess
 import unittest
 from tests.compat import mock
 
@@ -28,13 +29,22 @@ class LocalExecutorTest(unittest.TestCase):
 
     TEST_SUCCESS_COMMANDS = 5
 
-    def execution_parallelism(self, parallelism=0):
+    @mock.patch('airflow.executors.local_executor.subprocess.check_call')
+    def execution_parallelism(self, mock_check_call, parallelism=0):
+        success_command = ['airflow', 'run', 'true', 'some_parameter']
+        fail_command = ['airflow', 'run', 'false']
+
+        def fake_execute_command(command, close_fds=True):  # pylint: disable=unused-argument
+            if command != success_command:
+                raise subprocess.CalledProcessError(returncode=1, cmd=command)
+            else:
+                return 0
+
+        mock_check_call.side_effect = fake_execute_command
         executor = LocalExecutor(parallelism=parallelism)
         executor.start()
 
         success_key = 'success {}'
-        success_command = ['true', 'some_parameter']
-        fail_command = ['false', 'some_parameter']
         self.assertTrue(executor.result_queue.empty())
 
         for i in range(self.TEST_SUCCESS_COMMANDS):
@@ -58,11 +68,11 @@ class LocalExecutorTest(unittest.TestCase):
         self.assertEqual(executor.workers_used, expected)
 
     def test_execution_unlimited_parallelism(self):
-        self.execution_parallelism(parallelism=0)
+        self.execution_parallelism(parallelism=0)  # pylint: disable=no-value-for-parameter
 
     def test_execution_limited_parallelism(self):
         test_parallelism = 2
-        self.execution_parallelism(parallelism=test_parallelism)
+        self.execution_parallelism(parallelism=test_parallelism)  # pylint: disable=no-value-for-parameter
 
     @mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
     @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')