You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/15 22:01:47 UTC

[airflow] branch v1-10-test updated (f8a7e66 -> 7d332c5)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard f8a7e66  Further validation that only task commands are run by executors (#9240)
     new 7d332c5  Further validation that only task commands are run by executors (#9240)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f8a7e66)
            \
             N -- N -- N   refs/heads/v1-10-test (7d332c5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/executors/test_kubernetes_executor.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)


[airflow] 01/01: Further validation that only task commands are run by executors (#9240)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7d332c583df359a250428f19e48a2d889787c17a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 12 01:17:43 2020 +0100

    Further validation that only task commands are run by executors (#9240)
    
    (cherry-picked from 99c534e9faf)
---
 airflow/executors/kubernetes_executor.py    |  7 ++-----
 tests/executors/test_dask_executor.py       |  6 +++---
 tests/executors/test_kubernetes_executor.py |  3 ++-
 tests/executors/test_local_executor.py      | 20 +++++++++++++++-----
 4 files changed, 22 insertions(+), 14 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index b62462f..8b5fdc1 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -395,11 +395,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
         key, command, kube_executor_config = next_job
         dag_id, task_id, execution_date, try_number = key
 
-        if isinstance(command, str):
-            command = [command]
-
-        if command[0] != "airflow":
-            raise ValueError('The first element of command must be equal to "airflow".')
+        if command[0:2] != ["airflow", "run"]:
+            raise ValueError('The command must start with ["airflow", "run"].')
 
         config_pod = self.worker_configuration.make_pod(
             namespace=self.namespace,
diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py
index f43a54f..2bf9d8e 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -50,12 +50,12 @@ pytestmark = pytest.mark.xfail(condition=True, reason="The Dask executor is expe
 class BaseDaskTest(unittest.TestCase):
 
     def assert_tasks_on_executor(self, executor):
+
+        success_command = ['airflow', 'run', '--help']
+        fail_command = ['airflow', 'run', 'false']
         # start the executor
         executor.start()
 
-        success_command = ['true', 'some_parameter']
-        fail_command = ['false', 'some_parameter']
-
         executor.execute_async(key='success', command=success_command)
         executor.execute_async(key='fail', command=fail_command)
 
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 9073493..77299f6 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -155,7 +155,8 @@ class TestKubernetesExecutor(unittest.TestCase):
         # Execute a task while the Api Throws errors
         try_number = 1
         kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number),
-                                         command='command', executor_config={})
+                                         command=['airflow', 'run', 'true', 'some_parameter'],
+                                         executor_config={})
         kubernetesExecutor.sync()
         kubernetesExecutor.sync()
 
diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py
index 3aebbe2..67f0b28 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import subprocess
 import unittest
 from tests.compat import mock
 
@@ -28,13 +29,22 @@ class LocalExecutorTest(unittest.TestCase):
 
     TEST_SUCCESS_COMMANDS = 5
 
-    def execution_parallelism(self, parallelism=0):
+    @mock.patch('airflow.executors.local_executor.subprocess.check_call')
+    def execution_parallelism(self, mock_check_call, parallelism=0):
+        success_command = ['airflow', 'run', 'true', 'some_parameter']
+        fail_command = ['airflow', 'run', 'false']
+
+        def fake_execute_command(command, close_fds=True):  # pylint: disable=unused-argument
+            if command != success_command:
+                raise subprocess.CalledProcessError(returncode=1, cmd=command)
+            else:
+                return 0
+
+        mock_check_call.side_effect = fake_execute_command
         executor = LocalExecutor(parallelism=parallelism)
         executor.start()
 
         success_key = 'success {}'
-        success_command = ['true', 'some_parameter']
-        fail_command = ['false', 'some_parameter']
         self.assertTrue(executor.result_queue.empty())
 
         for i in range(self.TEST_SUCCESS_COMMANDS):
@@ -58,11 +68,11 @@ class LocalExecutorTest(unittest.TestCase):
         self.assertEqual(executor.workers_used, expected)
 
     def test_execution_unlimited_parallelism(self):
-        self.execution_parallelism(parallelism=0)
+        self.execution_parallelism(parallelism=0)  # pylint: disable=no-value-for-parameter
 
     def test_execution_limited_parallelism(self):
         test_parallelism = 2
-        self.execution_parallelism(parallelism=test_parallelism)
+        self.execution_parallelism(parallelism=test_parallelism)  # pylint: disable=no-value-for-parameter
 
     @mock.patch('airflow.executors.local_executor.LocalExecutor.sync')
     @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks')