You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/12 08:26:17 UTC
incubator-airflow git commit: [AIRFLOW-1436][AIRFLOW-1475]
EmrJobFlowSensor considers Cancelled step as Successful
Repository: incubator-airflow
Updated Branches:
refs/heads/master 61419ddc8 -> 404bee8d8
[AIRFLOW-1436][AIRFLOW-1475] EmrJobFlowSensor considers Cancelled step as Successful
Closes #2937 from Swalloow/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/404bee8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/404bee8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/404bee8d
Branch: refs/heads/master
Commit: 404bee8d859ee110ae1cbc80372aadf36edc58a5
Parents: 61419dd
Author: Swalloow <sw...@gmail.com>
Authored: Fri Jan 12 09:26:10 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 12 09:26:10 2018 +0100
----------------------------------------------------------------------
airflow/contrib/sensors/emr_base_sensor.py | 2 +-
airflow/contrib/sensors/emr_job_flow_sensor.py | 2 +-
airflow/contrib/sensors/emr_step_sensor.py | 4 +-
tests/contrib/sensors/test_emr_base_sensor.py | 9 +--
tests/contrib/sensors/test_emr_step_sensor.py | 87 +++++++++++++++------
5 files changed, 69 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 3ecaa42..c6f96f8 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -45,7 +45,7 @@ class EmrBaseSensor(BaseSensorOperator):
if state in self.NON_TERMINAL_STATES:
return False
- if state == self.FAILED_STATE:
+ if state in self.FAILED_STATE:
raise AirflowException('EMR job failed')
return True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index 87b65c8..a437fc3 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -26,7 +26,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
"""
NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING']
- FAILED_STATE = 'TERMINATED_WITH_ERRORS'
+ FAILED_STATE = ['TERMINATED_WITH_ERRORS']
template_fields = ['job_flow_id']
template_ext = ()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index 003d2d1..c5a450d 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -21,14 +21,14 @@ class EmrStepSensor(EmrBaseSensor):
Asks for the state of the step until it reaches a terminal state.
If it fails the sensor errors, failing the task.
- :param job_flow_id: job_flow_idwhich contains the step check the state of
+ :param job_flow_id: job_flow_id which contains the step check the state of
:type job_flow_id: string
:param step_id: step to check the state of
:type step_id: string
"""
NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
- FAILED_STATE = 'FAILED'
+ FAILED_STATE = ['CANCELLED', 'FAILED']
template_fields = ['job_flow_id', 'step_id']
template_ext = ()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/tests/contrib/sensors/test_emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py
index 9c39abb..970d189 100644
--- a/tests/contrib/sensors/test_emr_base_sensor.py
+++ b/tests/contrib/sensors/test_emr_base_sensor.py
@@ -26,7 +26,7 @@ class TestEmrBaseSensor(unittest.TestCase):
def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self):
class EmrBaseSensorSubclass(EmrBaseSensor):
NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
- FAILED_STATE = 'FAILED'
+ FAILED_STATE = ['FAILED']
def get_emr_response(self):
return {
@@ -49,7 +49,7 @@ class TestEmrBaseSensor(unittest.TestCase):
def test_poke_returns_false_when_state_is_a_non_terminal_state(self):
class EmrBaseSensorSubclass(EmrBaseSensor):
NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
- FAILED_STATE = 'FAILED'
+ FAILED_STATE = ['FAILED']
def get_emr_response(self):
return {
@@ -72,7 +72,7 @@ class TestEmrBaseSensor(unittest.TestCase):
def test_poke_returns_false_when_http_response_is_bad(self):
class EmrBaseSensorSubclass(EmrBaseSensor):
NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
- FAILED_STATE = 'FAILED'
+ FAILED_STATE = ['FAILED']
def get_emr_response(self):
return {
@@ -96,7 +96,7 @@ class TestEmrBaseSensor(unittest.TestCase):
def test_poke_raises_error_when_job_has_failed(self):
class EmrBaseSensorSubclass(EmrBaseSensor):
NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
- FAILED_STATE = 'FAILED'
+ FAILED_STATE = ['FAILED']
def get_emr_response(self):
return {
@@ -118,7 +118,6 @@ class TestEmrBaseSensor(unittest.TestCase):
operator.poke(None)
-
self.assertIn('EMR job failed', str(context.exception))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/tests/contrib/sensors/test_emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_step_sensor.py b/tests/contrib/sensors/test_emr_step_sensor.py
index 58ee461..b5d43fb 100644
--- a/tests/contrib/sensors/test_emr_step_sensor.py
+++ b/tests/contrib/sensors/test_emr_step_sensor.py
@@ -13,12 +13,11 @@
# limitations under the License.
import unittest
-import datetime
-from dateutil.tz import tzlocal
+from datetime import datetime
from mock import MagicMock, patch
-import boto3
+from dateutil.tz import tzlocal
-from airflow import configuration
+from airflow import configuration, AirflowException
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
DESCRIBE_JOB_STEP_RUNNING_RETURN = {
@@ -43,8 +42,37 @@ DESCRIBE_JOB_STEP_RUNNING_RETURN = {
'State': 'RUNNING',
'StateChangeReason': {},
'Timeline': {
- 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
- 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+ 'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
+ 'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
+ }
+ }
+ }
+}
+
+DESCRIBE_JOB_STEP_CANCELLED_RETURN = {
+ 'ResponseMetadata': {
+ 'HTTPStatusCode': 200,
+ 'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
+ },
+ 'Step': {
+ 'ActionOnFailure': 'CONTINUE',
+ 'Config': {
+ 'Args': [
+ '/usr/lib/spark/bin/run-example',
+ 'SparkPi',
+ '10'
+ ],
+ 'Jar': 'command-runner.jar',
+ 'Properties': {}
+ },
+ 'Id': 's-VK57YR1Z9Z5N',
+ 'Name': 'calculate_pi',
+ 'Status': {
+ 'State': 'CANCELLED',
+ 'StateChangeReason': {},
+ 'Timeline': {
+ 'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
+ 'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
}
}
}
@@ -72,8 +100,8 @@ DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
'State': 'COMPLETED',
'StateChangeReason': {},
'Timeline': {
- 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
- 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+ 'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()),
+ 'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal())
}
}
}
@@ -84,35 +112,42 @@ class TestEmrStepSensor(unittest.TestCase):
def setUp(self):
configuration.load_test_config()
- # Mock out the emr_client (moto has incorrect response)
- self.mock_emr_client = MagicMock()
- self.mock_emr_client.describe_step.side_effect = [
+ self.emr_client_mock = MagicMock()
+ self.sensor = EmrStepSensor(
+ task_id='test_task',
+ poke_interval=1,
+ job_flow_id='j-8989898989',
+ step_id='s-VK57YR1Z9Z5N',
+ aws_conn_id='aws_default',
+ )
+
+ def test_step_completed(self):
+ self.emr_client_mock.describe_step.side_effect = [
DESCRIBE_JOB_STEP_RUNNING_RETURN,
DESCRIBE_JOB_STEP_COMPLETED_RETURN
]
- # Mock out the emr_client creator
- self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
-
+ self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock)
- def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self):
with patch('boto3.client', self.boto3_client_mock):
+ self.sensor.execute(None)
- operator = EmrStepSensor(
- task_id='test_task',
- poke_interval=1,
- job_flow_id='j-8989898989',
- step_id='s-VK57YR1Z9Z5N',
- aws_conn_id='aws_default',
+ self.assertEqual(self.emr_client_mock.describe_step.call_count, 2)
+ self.emr_client_mock.describe_step.assert_called_with(
+ ClusterId='j-8989898989',
+ StepId='s-VK57YR1Z9Z5N'
)
- operator.execute(None)
+ def test_step_cancelled(self):
+ self.emr_client_mock.describe_step.side_effect = [
+ DESCRIBE_JOB_STEP_RUNNING_RETURN,
+ DESCRIBE_JOB_STEP_CANCELLED_RETURN
+ ]
- # make sure we called twice
- self.assertEqual(self.mock_emr_client.describe_step.call_count, 2)
+ self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock)
- # make sure it was called with the job_flow_id and step_id
- self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N')
+ with patch('boto3.client', self.boto3_client_mock):
+ self.assertRaises(AirflowException, self.sensor.execute, None)
if __name__ == '__main__':