You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/16 10:21:55 UTC
[airflow] branch master updated: Check for same task instead of
Equality to detect Duplicate Tasks (#8828)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 15273f0 Check for same task instead of Equality to detect Duplicate Tasks (#8828)
15273f0 is described below
commit 15273f0ea05ec579c631ce26b5d620233ebdc4d2
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat May 16 11:21:12 2020 +0100
Check for same task instead of Equality to detect Duplicate Tasks (#8828)
---
airflow/example_dags/example_complex.py | 2 +-
airflow/models/baseoperator.py | 7 ++--
airflow/models/dag.py | 2 +-
.../cloud/example_dags/example_datacatalog.py | 2 +-
.../cloud/example_dags/example_gcs_to_gcs.py | 2 +-
tests/models/test_dag.py | 18 +---------
tests/models/test_taskinstance.py | 16 ++++-----
.../amazon/aws/operators/test_s3_to_sftp.py | 2 +-
.../google/cloud/operators/test_mlengine_utils.py | 42 ++++++++++++----------
tests/providers/google/cloud/sensors/test_gcs.py | 4 +--
.../microsoft/azure/operators/test_file_to_wasb.py | 4 +--
.../azure/operators/test_wasb_delete_blob.py | 4 +--
.../providers/microsoft/azure/sensors/test_wasb.py | 8 ++---
tests/providers/sftp/operators/test_sftp.py | 20 +++++------
tests/sensors/test_base_sensor.py | 11 ++++--
tests/sensors/test_external_task_sensor.py | 8 ++---
tests/sensors/test_sql_sensor.py | 8 ++---
17 files changed, 78 insertions(+), 82 deletions(-)
diff --git a/airflow/example_dags/example_complex.py b/airflow/example_dags/example_complex.py
index f5e5c35..54e27b8 100644
--- a/airflow/example_dags/example_complex.py
+++ b/airflow/example_dags/example_complex.py
@@ -81,7 +81,7 @@ with models.DAG(
)
create_tag_template_field_result2 = BashOperator(
- task_id="create_tag_template_field_result", bash_command="echo create_tag_template_field_result"
+ task_id="create_tag_template_field_result2", bash_command="echo create_tag_template_field_result"
)
# Delete
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 0223f84..85f01dc 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -36,7 +36,7 @@ from dateutil.relativedelta import relativedelta
from sqlalchemy.orm import Session
from airflow.configuration import conf
-from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+from airflow.exceptions import AirflowException
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.base import Operator
from airflow.models.pool import Pool
@@ -600,9 +600,8 @@ class BaseOperator(Operator, LoggingMixin):
"The DAG assigned to {} can not be changed.".format(self))
elif self.task_id not in dag.task_dict:
dag.add_task(self)
- elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] != self:
- raise DuplicateTaskIdFound(
- "Task id '{}' has already been added to the DAG".format(self.task_id))
+ elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self:
+ dag.add_task(self)
self._dag = dag # pylint: disable=attribute-defined-outside-init
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 82322ec..5699ba9 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1337,7 +1337,7 @@ class DAG(BaseDag, LoggingMixin):
elif task.end_date and self.end_date:
task.end_date = min(task.end_date, self.end_date)
- if task.task_id in self.task_dict and self.task_dict[task.task_id] != task:
+ if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:
raise DuplicateTaskIdFound(
"Task id '{}' has already been added to the DAG".format(task.task_id))
else:
diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
index 7baf33f..55dc4a2 100644
--- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
+++ b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
@@ -181,7 +181,7 @@ with models.DAG("example_gcp_datacatalog", default_args=default_args, schedule_i
# [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
create_tag_template_field_result2 = BashOperator(
- task_id="create_tag_template_field_result",
+ task_id="create_tag_template_field_result2",
bash_command="echo \"{{ task_instance.xcom_pull('create_tag_template_field') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
index 6e5bd6d..27d5449 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
@@ -100,7 +100,7 @@ with models.DAG(
# [START howto_operator_gcs_to_gcs_delimiter]
copy_files_with_delimiter = GCSToGCSOperator(
- task_id="copy_files_with_wildcard",
+ task_id="copy_files_with_delimiter",
source_bucket=BUCKET_1_SRC,
source_object="data/",
destination_bucket=BUCKET_1_DST,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 2ac0c81..6eb51a4 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -979,14 +979,6 @@ class TestDag(unittest.TestCase):
self.assertEqual(dag.task_dict, {op1.task_id: op1})
- # Also verify that DAGs with duplicate task_ids don't raise errors
- with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
- op3 = DummyOperator(task_id="t3")
- op4 = BashOperator(task_id="t4", bash_command="sleep 1")
- op3 >> op4
-
- self.assertEqual(dag1.task_dict, {op3.task_id: op3, op4.task_id: op4})
-
def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
"""Verify tasks with Duplicate task_id raises error"""
with self.assertRaisesRegex(
@@ -994,19 +986,11 @@ class TestDag(unittest.TestCase):
):
dag = DAG("test_dag", start_date=DEFAULT_DATE)
op1 = DummyOperator(task_id="t1", dag=dag)
- op2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+ op2 = DummyOperator(task_id="t1", dag=dag)
op1 >> op2
self.assertEqual(dag.task_dict, {op1.task_id: op1})
- # Also verify that DAGs with duplicate task_ids don't raise errors
- dag1 = DAG("test_dag_1", start_date=DEFAULT_DATE)
- op3 = DummyOperator(task_id="t3", dag=dag1)
- op4 = DummyOperator(task_id="t4", dag=dag1)
- op3 >> op4
-
- self.assertEqual(dag1.task_dict, {op3.task_id: op3, op4.task_id: op4})
-
def test_duplicate_task_ids_for_same_task_is_allowed(self):
"""Verify that same tasks with Duplicate task_id do not raise error"""
with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 20b6274..631769e 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -373,20 +373,20 @@ class TestTaskInstance(unittest.TestCase):
"""
test that updating the executor_config propogates to the TaskInstance DB
"""
- dag = models.DAG(dag_id='test_run_pooling_task')
- task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow',
- executor_config={'foo': 'bar'},
- start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
+ with models.DAG(dag_id='test_run_pooling_task') as dag:
+ task = DummyOperator(task_id='test_run_pooling_task_op', owner='airflow',
+ executor_config={'foo': 'bar'},
+ start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
ti = TI(
task=task, execution_date=timezone.utcnow())
ti.run(session=session)
tis = dag.get_task_instances()
self.assertEqual({'foo': 'bar'}, tis[0].executor_config)
-
- task2 = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, owner='airflow',
- executor_config={'bar': 'baz'},
- start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
+ with models.DAG(dag_id='test_run_pooling_task') as dag:
+ task2 = DummyOperator(task_id='test_run_pooling_task_op', owner='airflow',
+ executor_config={'bar': 'baz'},
+ start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))
ti = TI(
task=task2, execution_date=timezone.utcnow())
diff --git a/tests/providers/amazon/aws/operators/test_s3_to_sftp.py b/tests/providers/amazon/aws/operators/test_s3_to_sftp.py
index e1e6ffa..24eda72 100644
--- a/tests/providers/amazon/aws/operators/test_s3_to_sftp.py
+++ b/tests/providers/amazon/aws/operators/test_s3_to_sftp.py
@@ -133,7 +133,7 @@ class TestS3ToSFTPOperator(unittest.TestCase):
def delete_remote_resource(self):
# check the remote file content
remove_file_task = SSHOperator(
- task_id="test_check_file",
+ task_id="test_rm_file",
ssh_hook=self.hook,
command="rm {0}".format(self.sftp_path),
do_xcom_push=True,
diff --git a/tests/providers/google/cloud/operators/test_mlengine_utils.py b/tests/providers/google/cloud/operators/test_mlengine_utils.py
index 692cd3f..13e0e87 100644
--- a/tests/providers/google/cloud/operators/test_mlengine_utils.py
+++ b/tests/providers/google/cloud/operators/test_mlengine_utils.py
@@ -131,16 +131,18 @@ class TestCreateEvaluateOps(unittest.TestCase):
self.assertEqual('err=0.9', result)
def test_failures(self):
- dag = DAG(
- 'test_dag',
- default_args={
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE,
- 'end_date': DEFAULT_DATE,
- 'project_id': 'test-project',
- 'region': 'us-east1',
- },
- schedule_interval='@daily')
+ def create_test_dag(dag_id):
+ dag = DAG(
+ dag_id,
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'end_date': DEFAULT_DATE,
+ 'project_id': 'test-project',
+ 'region': 'us-east1',
+ },
+ schedule_interval='@daily')
+ return dag
input_with_model = self.INPUT_MISSING_ORIGIN.copy()
other_params_but_models = {
@@ -151,26 +153,30 @@ class TestCreateEvaluateOps(unittest.TestCase):
'prediction_path': input_with_model['outputPath'],
'metric_fn_and_keys': (self.metric_fn, ['err']),
'validate_fn': (lambda x: 'err=%.1f' % x['err']),
- 'dag': dag,
}
with self.assertRaisesRegex(AirflowException, 'Missing model origin'):
- mlengine_operator_utils.create_evaluate_ops(**other_params_but_models)
+ mlengine_operator_utils.create_evaluate_ops(
+ dag=create_test_dag('test_dag_1'), **other_params_but_models)
with self.assertRaisesRegex(AirflowException, 'Ambiguous model origin'):
- mlengine_operator_utils.create_evaluate_ops(model_uri='abc', model_name='cde',
- **other_params_but_models)
+ mlengine_operator_utils.create_evaluate_ops(
+ dag=create_test_dag('test_dag_2'), model_uri='abc', model_name='cde',
+ **other_params_but_models)
with self.assertRaisesRegex(AirflowException, 'Ambiguous model origin'):
- mlengine_operator_utils.create_evaluate_ops(model_uri='abc', version_name='vvv',
- **other_params_but_models)
+ mlengine_operator_utils.create_evaluate_ops(
+ dag=create_test_dag('test_dag_3'), model_uri='abc', version_name='vvv',
+ **other_params_but_models)
with self.assertRaisesRegex(AirflowException, '`metric_fn` param must be callable'):
params = other_params_but_models.copy()
params['metric_fn_and_keys'] = (None, ['abc'])
- mlengine_operator_utils.create_evaluate_ops(model_uri='gs://blah', **params)
+ mlengine_operator_utils.create_evaluate_ops(
+ dag=create_test_dag('test_dag_4'), model_uri='gs://blah', **params)
with self.assertRaisesRegex(AirflowException, '`validate_fn` param must be callable'):
params = other_params_but_models.copy()
params['validate_fn'] = None
- mlengine_operator_utils.create_evaluate_ops(model_uri='gs://blah', **params)
+ mlengine_operator_utils.create_evaluate_ops(
+ dag=create_test_dag('test_dag_5'), model_uri='gs://blah', **params)
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py
index 2da2462..46cec77 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -205,7 +205,7 @@ class TestGCSUploadSessionCompleteSensor(TestCase):
self.dag = dag
self.sensor = GCSUploadSessionCompleteSensor(
- task_id='sensor',
+ task_id='sensor_1',
bucket='test-bucket',
prefix='test-prefix/path',
inactivity_period=12,
@@ -227,7 +227,7 @@ class TestGCSUploadSessionCompleteSensor(TestCase):
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
def test_files_deleted_between_pokes_allow_delete(self):
self.sensor = GCSUploadSessionCompleteSensor(
- task_id='sensor',
+ task_id='sensor_2',
bucket='test-bucket',
prefix='test-prefix/path',
inactivity_period=12,
diff --git a/tests/providers/microsoft/azure/operators/test_file_to_wasb.py b/tests/providers/microsoft/azure/operators/test_file_to_wasb.py
index 909a885..0ad68c8 100644
--- a/tests/providers/microsoft/azure/operators/test_file_to_wasb.py
+++ b/tests/providers/microsoft/azure/operators/test_file_to_wasb.py
@@ -45,7 +45,7 @@ class TestFileToWasbOperator(unittest.TestCase):
def test_init(self):
operator = FileToWasbOperator(
- task_id='wasb_operator',
+ task_id='wasb_operator_1',
dag=self.dag,
**self._config
)
@@ -58,7 +58,7 @@ class TestFileToWasbOperator(unittest.TestCase):
self.assertEqual(operator.retries, self._config['retries'])
operator = FileToWasbOperator(
- task_id='wasb_operator',
+ task_id='wasb_operator_2',
dag=self.dag,
load_options={'timeout': 2},
**self._config
diff --git a/tests/providers/microsoft/azure/operators/test_wasb_delete_blob.py b/tests/providers/microsoft/azure/operators/test_wasb_delete_blob.py
index 776814d..1289312 100644
--- a/tests/providers/microsoft/azure/operators/test_wasb_delete_blob.py
+++ b/tests/providers/microsoft/azure/operators/test_wasb_delete_blob.py
@@ -42,7 +42,7 @@ class TestWasbDeleteBlobOperator(unittest.TestCase):
def test_init(self):
operator = WasbDeleteBlobOperator(
- task_id='wasb_operator',
+ task_id='wasb_operator_1',
dag=self.dag,
**self._config
)
@@ -53,7 +53,7 @@ class TestWasbDeleteBlobOperator(unittest.TestCase):
self.assertEqual(operator.ignore_if_missing, False)
operator = WasbDeleteBlobOperator(
- task_id='wasb_operator',
+ task_id='wasb_operator_2',
dag=self.dag,
is_prefix=True,
ignore_if_missing=True,
diff --git a/tests/providers/microsoft/azure/sensors/test_wasb.py b/tests/providers/microsoft/azure/sensors/test_wasb.py
index 6f1e2d7..74dc0f1 100644
--- a/tests/providers/microsoft/azure/sensors/test_wasb.py
+++ b/tests/providers/microsoft/azure/sensors/test_wasb.py
@@ -43,7 +43,7 @@ class TestWasbBlobSensor(unittest.TestCase):
def test_init(self):
sensor = WasbBlobSensor(
- task_id='wasb_sensor',
+ task_id='wasb_sensor_1',
dag=self.dag,
**self._config
)
@@ -54,7 +54,7 @@ class TestWasbBlobSensor(unittest.TestCase):
self.assertEqual(sensor.timeout, self._config['timeout'])
sensor = WasbBlobSensor(
- task_id='wasb_sensor',
+ task_id='wasb_sensor_2',
dag=self.dag,
check_options={'timeout': 2},
**self._config
@@ -94,7 +94,7 @@ class TestWasbPrefixSensor(unittest.TestCase):
def test_init(self):
sensor = WasbPrefixSensor(
- task_id='wasb_sensor',
+ task_id='wasb_sensor_1',
dag=self.dag,
**self._config
)
@@ -105,7 +105,7 @@ class TestWasbPrefixSensor(unittest.TestCase):
self.assertEqual(sensor.timeout, self._config['timeout'])
sensor = WasbPrefixSensor(
- task_id='wasb_sensor',
+ task_id='wasb_sensor_2',
dag=self.dag,
check_options={'timeout': 2},
**self._config
diff --git a/tests/providers/sftp/operators/test_sftp.py b/tests/providers/sftp/operators/test_sftp.py
index 6b074f3..a3a4433 100644
--- a/tests/providers/sftp/operators/test_sftp.py
+++ b/tests/providers/sftp/operators/test_sftp.py
@@ -75,7 +75,7 @@ class TestSFTPOperator(unittest.TestCase):
# put test file to remote
put_test_task = SFTPOperator(
- task_id="test_sftp",
+ task_id="put_test_task",
ssh_hook=self.hook,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
@@ -89,7 +89,7 @@ class TestSFTPOperator(unittest.TestCase):
# check the remote file content
check_file_task = SSHOperator(
- task_id="test_check_file",
+ task_id="check_file_task",
ssh_hook=self.hook,
command="cat {0}".format(self.test_remote_filepath),
do_xcom_push=True,
@@ -99,7 +99,7 @@ class TestSFTPOperator(unittest.TestCase):
ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow())
ti3.run()
self.assertEqual(
- ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
+ ti3.xcom_pull(task_ids=check_file_task.task_id, key='return_value').strip(),
test_local_file_content)
@conf_vars({('core', 'enable_xcom_pickling'): 'True'})
@@ -178,7 +178,7 @@ class TestSFTPOperator(unittest.TestCase):
# put test file to remote
put_test_task = SFTPOperator(
- task_id="test_sftp",
+ task_id="put_test_task",
ssh_hook=self.hook,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
@@ -191,7 +191,7 @@ class TestSFTPOperator(unittest.TestCase):
# check the remote file content
check_file_task = SSHOperator(
- task_id="test_check_file",
+ task_id="check_file_task",
ssh_hook=self.hook,
command="cat {0}".format(self.test_remote_filepath),
do_xcom_push=True,
@@ -201,7 +201,7 @@ class TestSFTPOperator(unittest.TestCase):
ti3 = TaskInstance(task=check_file_task, execution_date=timezone.utcnow())
ti3.run()
self.assertEqual(
- ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
+ ti3.xcom_pull(task_ids=check_file_task.task_id, key='return_value').strip(),
b64encode(test_local_file_content).decode('utf-8'))
@conf_vars({('core', 'enable_xcom_pickling'): 'True'})
@@ -362,7 +362,7 @@ class TestSFTPOperator(unittest.TestCase):
with self.assertRaisesRegex(AirflowException,
"Cannot operate without ssh_hook or ssh_conn_id."):
task_0 = SFTPOperator(
- task_id="test_sftp",
+ task_id="test_sftp_0",
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
operation=SFTPOperation.PUT,
@@ -372,7 +372,7 @@ class TestSFTPOperator(unittest.TestCase):
# if ssh_hook is invalid/not provided, use ssh_conn_id to create SSHHook
task_1 = SFTPOperator(
- task_id="test_sftp",
+ task_id="test_sftp_1",
ssh_hook="string_rather_than_SSHHook", # invalid ssh_hook
ssh_conn_id=TEST_CONN_ID,
local_filepath=self.test_local_filepath,
@@ -387,7 +387,7 @@ class TestSFTPOperator(unittest.TestCase):
self.assertEqual(task_1.ssh_hook.ssh_conn_id, TEST_CONN_ID)
task_2 = SFTPOperator(
- task_id="test_sftp",
+ task_id="test_sftp_2",
ssh_conn_id=TEST_CONN_ID, # no ssh_hook provided
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
@@ -402,7 +402,7 @@ class TestSFTPOperator(unittest.TestCase):
# if both valid ssh_hook and ssh_conn_id are provided, ignore ssh_conn_id
task_3 = SFTPOperator(
- task_id="test_sftp",
+ task_id="test_sftp_3",
ssh_hook=self.hook,
ssh_conn_id=TEST_CONN_ID,
local_filepath=self.test_local_filepath,
diff --git a/tests/sensors/test_base_sensor.py b/tests/sensors/test_base_sensor.py
index df7697b..cb11519 100644
--- a/tests/sensors/test_base_sensor.py
+++ b/tests/sensors/test_base_sensor.py
@@ -70,16 +70,17 @@ class TestBaseSensor(unittest.TestCase):
state=State.RUNNING
)
- def _make_sensor(self, return_value, **kwargs):
+ def _make_sensor(self, return_value, task_id=SENSOR_OP, **kwargs):
poke_interval = 'poke_interval'
timeout = 'timeout'
+
if poke_interval not in kwargs:
kwargs[poke_interval] = 0
if timeout not in kwargs:
kwargs[timeout] = 0
sensor = DummySensor(
- task_id=SENSOR_OP,
+ task_id=task_id,
return_value=return_value,
dag=self.dag,
**kwargs
@@ -471,17 +472,20 @@ class TestBaseSensor(unittest.TestCase):
positive_poke_interval = 10
with self.assertRaises(AirflowException):
self._make_sensor(
+ task_id='test_sensor_task_1',
return_value=None,
poke_interval=negative_poke_interval,
timeout=25)
with self.assertRaises(AirflowException):
self._make_sensor(
+ task_id='test_sensor_task_2',
return_value=None,
poke_interval=non_number_poke_interval,
timeout=25)
self._make_sensor(
+ task_id='test_sensor_task_3',
return_value=None,
poke_interval=positive_poke_interval,
timeout=25)
@@ -492,17 +496,20 @@ class TestBaseSensor(unittest.TestCase):
positive_timeout = 25
with self.assertRaises(AirflowException):
self._make_sensor(
+ task_id='test_sensor_task_1',
return_value=None,
poke_interval=10,
timeout=negative_timeout)
with self.assertRaises(AirflowException):
self._make_sensor(
+ task_id='test_sensor_task_2',
return_value=None,
poke_interval=10,
timeout=non_number_timeout)
self._make_sensor(
+ task_id='test_sensor_task_3',
return_value=None,
poke_interval=10,
timeout=positive_timeout)
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 0386a77..33672fd 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -278,7 +278,7 @@ exit 0
self.test_time_sensor()
# check that the execution_fn works
op1 = ExternalTaskSensor(
- task_id='test_external_task_sensor_check_delta',
+ task_id='test_external_task_sensor_check_delta_1',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_date_fn=lambda dt: dt + timedelta(0),
@@ -292,7 +292,7 @@ exit 0
)
# double check that the execution is being called by failing the test
op2 = ExternalTaskSensor(
- task_id='test_external_task_sensor_check_delta',
+ task_id='test_external_task_sensor_check_delta_2',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
execution_date_fn=lambda dt: dt + timedelta(days=1),
@@ -325,7 +325,7 @@ exit 0
def test_catch_invalid_allowed_states(self):
with self.assertRaises(ValueError):
ExternalTaskSensor(
- task_id='test_external_task_sensor_check',
+ task_id='test_external_task_sensor_check_1',
external_dag_id=TEST_DAG_ID,
external_task_id=TEST_TASK_ID,
allowed_states=['invalid_state'],
@@ -334,7 +334,7 @@ exit 0
with self.assertRaises(ValueError):
ExternalTaskSensor(
- task_id='test_external_task_sensor_check',
+ task_id='test_external_task_sensor_check_2',
external_dag_id=TEST_DAG_ID,
external_task_id=None,
allowed_states=['invalid_state'],
diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py
index b4274c2..ac58f26 100644
--- a/tests/sensors/test_sql_sensor.py
+++ b/tests/sensors/test_sql_sensor.py
@@ -55,7 +55,7 @@ class TestSqlSensor(TestHiveEnvironment):
@pytest.mark.backend("mysql")
def test_sql_sensor_mysql(self):
op1 = SqlSensor(
- task_id='sql_sensor_check',
+ task_id='sql_sensor_check_1',
conn_id='mysql_default',
sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
dag=self.dag
@@ -63,7 +63,7 @@ class TestSqlSensor(TestHiveEnvironment):
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
op2 = SqlSensor(
- task_id='sql_sensor_check',
+ task_id='sql_sensor_check_2',
conn_id='mysql_default',
sql="SELECT count(%s) FROM INFORMATION_SCHEMA.TABLES",
parameters=["table_name"],
@@ -74,7 +74,7 @@ class TestSqlSensor(TestHiveEnvironment):
@pytest.mark.backend("postgres")
def test_sql_sensor_postgres(self):
op1 = SqlSensor(
- task_id='sql_sensor_check',
+ task_id='sql_sensor_check_1',
conn_id='postgres_default',
sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
dag=self.dag
@@ -82,7 +82,7 @@ class TestSqlSensor(TestHiveEnvironment):
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
op2 = SqlSensor(
- task_id='sql_sensor_check',
+ task_id='sql_sensor_check_2',
conn_id='postgres_default',
sql="SELECT count(%s) FROM INFORMATION_SCHEMA.TABLES",
parameters=["table_name"],