You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/15 22:01:48 UTC
[airflow] 01/01: Further validation that only task commands are run
by executors (#9240)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7d332c583df359a250428f19e48a2d889787c17a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 12 01:17:43 2020 +0100
Further validation that only task commands are run by executors (#9240)
(cherry-picked from 99c534e9faf)
---
airflow/executors/kubernetes_executor.py | 7 ++-----
tests/executors/test_dask_executor.py | 6 +++---
tests/executors/test_kubernetes_executor.py | 3 ++-
tests/executors/test_local_executor.py | 20 +++++++++++++++-----
4 files changed, 22 insertions(+), 14 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index b62462f..8b5fdc1 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -395,11 +395,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
key, command, kube_executor_config = next_job
dag_id, task_id, execution_date, try_number = key
- if isinstance(command, str):
- command = [command]
-
- if command[0] != "airflow":
- raise ValueError('The first element of command must be equal to "airflow".')
+ if command[0:2] != ["airflow", "run"]:
+ raise ValueError('The command must start with ["airflow", "run"].')
config_pod = self.worker_configuration.make_pod(
namespace=self.namespace,
diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py
index f43a54f..2bf9d8e 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -50,12 +50,12 @@ pytestmark = pytest.mark.xfail(condition=True, reason="The Dask executor is expe
class BaseDaskTest(unittest.TestCase):
def assert_tasks_on_executor(self, executor):
+
+ success_command = ['airflow', 'run', '--help']
+ fail_command = ['airflow', 'run', 'false']
# start the executor
executor.start()
- success_command = ['true', 'some_parameter']
- fail_command = ['false', 'some_parameter']
-
executor.execute_async(key='success', command=success_command)
executor.execute_async(key='fail', command=fail_command)
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9073493..77299f6 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -155,7 +155,8 @@ class TestKubernetesExecutor(unittest.TestCase):
# Execute a task while the Api Throws errors
try_number = 1
kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
- command='command', executor_config={})
+ command=['airflow', 'run', 'true', 'some_parameter'],
+ executor_config={})
kubernetesExecutor.sync()
kubernetesExecutor.sync()
diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py
index 3aebbe2..67f0b28 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
+import subprocess
import unittest
from tests.compat import mock
@@ -28,13 +29,22 @@ class LocalExecutorTest(unittest.TestCase):
TEST_SUCCESS_COMMANDS = 5
- def execution_parallelism(self, parallelism=0):
+ @mock.patch('airflow.executors.local_executor.subprocess.check_call')
+ def execution_parallelism(self, mock_check_call, parallelism=0):
+ success_command = ['airflow', 'run', 'true', 'some_parameter']
+ fail_command = ['airflow', 'run', 'false']
+
+ def fake_execute_command(command, close_fds=True): # pylint: disable=unused-argument
+ if command != success_command:
+ raise subprocess.CalledProcessError(returncode=1, cmd=command)
+ else:
+ return 0
+
+ mock_check_call.side_effect = fake_execute_command
executor = LocalExecutor(parallelism=parallelism)
executor.start()
success_key = 'success {}'
- success_command = ['true', 'some_parameter']
- fail_command = ['false', 'some_parameter']
self.assertTrue(executor.result_queue.empty())
for i in range(self.TEST_SUCCESS_COMMANDS):
@@ -58,11 +68,11 @@ class LocalExecutorTest(unittest.TestCase):
self.assertEqual(executor.workers_used, expected)
def test_execution_unlimited_parallelism(self):
- self.execution_parallelism(parallelism=0)
+ self.execution_parallelism(parallelism=0) # pylint: disable=no-value-for-parameter
def test_execution_limited_parallelism(self):
test_parallelism = 2
- self.execution_parallelism(parallelism=test_parallelism)
+ self.execution_parallelism(parallelism=test_parallelism) # pylint: disable=no-value-for-parameter
@mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
@mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')