You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/12 08:26:17 UTC

incubator-airflow git commit: [AIRFLOW-1436][AIRFLOW-1475] EmrJobFlowSensor considers Cancelled step as Successful

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 61419ddc8 -> 404bee8d8


[AIRFLOW-1436][AIRFLOW-1475] EmrJobFlowSensor considers Cancelled step as Successful

Closes #2937 from Swalloow/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/404bee8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/404bee8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/404bee8d

Branch: refs/heads/master
Commit: 404bee8d859ee110ae1cbc80372aadf36edc58a5
Parents: 61419dd
Author: Swalloow <sw...@gmail.com>
Authored: Fri Jan 12 09:26:10 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 12 09:26:10 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/sensors/emr_base_sensor.py     |  2 +-
 airflow/contrib/sensors/emr_job_flow_sensor.py |  2 +-
 airflow/contrib/sensors/emr_step_sensor.py     |  4 +-
 tests/contrib/sensors/test_emr_base_sensor.py  |  9 +--
 tests/contrib/sensors/test_emr_step_sensor.py  | 87 +++++++++++++++------
 5 files changed, 69 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 3ecaa42..c6f96f8 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -45,7 +45,7 @@ class EmrBaseSensor(BaseSensorOperator):
         if state in self.NON_TERMINAL_STATES:
             return False
 
-        if state == self.FAILED_STATE:
+        if state in self.FAILED_STATE:
             raise AirflowException('EMR job failed')
 
         return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index 87b65c8..a437fc3 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -26,7 +26,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
     """
 
     NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING']
-    FAILED_STATE = 'TERMINATED_WITH_ERRORS'
+    FAILED_STATE = ['TERMINATED_WITH_ERRORS']
     template_fields = ['job_flow_id']
     template_ext = ()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index 003d2d1..c5a450d 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -21,14 +21,14 @@ class EmrStepSensor(EmrBaseSensor):
     Asks for the state of the step until it reaches a terminal state.
     If it fails the sensor errors, failing the task.
 
-    :param job_flow_id: job_flow_idwhich contains the step check the state of
+    :param job_flow_id: job_flow_id which contains the step check the state of
     :type job_flow_id: string
     :param step_id: step to check the state of
     :type step_id: string
     """
 
     NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-    FAILED_STATE = 'FAILED'
+    FAILED_STATE = ['CANCELLED', 'FAILED']
     template_fields = ['job_flow_id', 'step_id']
     template_ext = ()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/tests/contrib/sensors/test_emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py
index 9c39abb..970d189 100644
--- a/tests/contrib/sensors/test_emr_base_sensor.py
+++ b/tests/contrib/sensors/test_emr_base_sensor.py
@@ -26,7 +26,7 @@ class TestEmrBaseSensor(unittest.TestCase):
     def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self):
         class EmrBaseSensorSubclass(EmrBaseSensor):
             NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
+            FAILED_STATE = ['FAILED']
 
             def get_emr_response(self):
                 return {
@@ -49,7 +49,7 @@ class TestEmrBaseSensor(unittest.TestCase):
     def test_poke_returns_false_when_state_is_a_non_terminal_state(self):
         class EmrBaseSensorSubclass(EmrBaseSensor):
             NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
+            FAILED_STATE = ['FAILED']
 
             def get_emr_response(self):
                 return {
@@ -72,7 +72,7 @@ class TestEmrBaseSensor(unittest.TestCase):
     def test_poke_returns_false_when_http_response_is_bad(self):
         class EmrBaseSensorSubclass(EmrBaseSensor):
             NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
+            FAILED_STATE = ['FAILED']
 
             def get_emr_response(self):
                 return {
@@ -96,7 +96,7 @@ class TestEmrBaseSensor(unittest.TestCase):
     def test_poke_raises_error_when_job_has_failed(self):
         class EmrBaseSensorSubclass(EmrBaseSensor):
             NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
+            FAILED_STATE = ['FAILED']
 
             def get_emr_response(self):
                 return {
@@ -118,7 +118,6 @@ class TestEmrBaseSensor(unittest.TestCase):
 
             operator.poke(None)
 
-
         self.assertIn('EMR job failed', str(context.exception))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/tests/contrib/sensors/test_emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_step_sensor.py b/tests/contrib/sensors/test_emr_step_sensor.py
index 58ee461..b5d43fb 100644
--- a/tests/contrib/sensors/test_emr_step_sensor.py
+++ b/tests/contrib/sensors/test_emr_step_sensor.py
@@ -13,12 +13,11 @@
 # limitations under the License.
 
 import unittest
-import datetime
-from dateutil.tz import tzlocal
+from datetime import datetime
 from mock import MagicMock, patch
-import boto3
+from dateutil.tz import tzlocal
 
-from airflow import configuration
+from airflow import configuration, AirflowException
 from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
 
 DESCRIBE_JOB_STEP_RUNNING_RETURN = {
@@ -43,8 +42,37 @@ DESCRIBE_JOB_STEP_RUNNING_RETURN = {
             'State': 'RUNNING',
             'StateChangeReason': {},
             'Timeline': {
-                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
-                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+                'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
+                'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
+            }
+        }
+    }
+}
+
+DESCRIBE_JOB_STEP_CANCELLED_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
+    },
+    'Step': {
+        'ActionOnFailure': 'CONTINUE',
+        'Config': {
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ],
+            'Jar': 'command-runner.jar',
+            'Properties': {}
+        },
+        'Id': 's-VK57YR1Z9Z5N',
+        'Name': 'calculate_pi',
+        'Status': {
+            'State': 'CANCELLED',
+            'StateChangeReason': {},
+            'Timeline': {
+                'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
+                'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
             }
         }
     }
@@ -72,8 +100,8 @@ DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
             'State': 'COMPLETED',
             'StateChangeReason': {},
             'Timeline': {
-                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
-                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+                'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
+                'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
             }
         }
     }
@@ -84,35 +112,42 @@ class TestEmrStepSensor(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()
 
-        # Mock out the emr_client (moto has incorrect response)
-        self.mock_emr_client = MagicMock()
-        self.mock_emr_client.describe_step.side_effect = [
+        self.emr_client_mock = MagicMock()
+        self.sensor = EmrStepSensor(
+            task_id='test_task',
+            poke_interval=1,
+            job_flow_id='j-8989898989',
+            step_id='s-VK57YR1Z9Z5N',
+            aws_conn_id='aws_default',
+        )
+
+    def test_step_completed(self):
+        self.emr_client_mock.describe_step.side_effect = [
             DESCRIBE_JOB_STEP_RUNNING_RETURN,
             DESCRIBE_JOB_STEP_COMPLETED_RETURN
         ]
 
-        # Mock out the emr_client creator
-        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
-
+        self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock)
 
-    def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self):
         with patch('boto3.client', self.boto3_client_mock):
+            self.sensor.execute(None)
 
-            operator = EmrStepSensor(
-                task_id='test_task',
-                poke_interval=1,
-                job_flow_id='j-8989898989',
-                step_id='s-VK57YR1Z9Z5N',
-                aws_conn_id='aws_default',
+            self.assertEqual(self.emr_client_mock.describe_step.call_count, 2)
+            self.emr_client_mock.describe_step.assert_called_with(
+                ClusterId='j-8989898989',
+                StepId='s-VK57YR1Z9Z5N'
             )
 
-            operator.execute(None)
+    def test_step_cancelled(self):
+        self.emr_client_mock.describe_step.side_effect = [
+            DESCRIBE_JOB_STEP_RUNNING_RETURN,
+            DESCRIBE_JOB_STEP_CANCELLED_RETURN
+        ]
 
-            # make sure we called twice
-            self.assertEqual(self.mock_emr_client.describe_step.call_count, 2)
+        self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock)
 
-            # make sure it was called with the job_flow_id and step_id
-            self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N')
+        with patch('boto3.client', self.boto3_client_mock):
+            self.assertRaises(AirflowException, self.sensor.execute, None)
 
 
 if __name__ == '__main__':