You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/15 16:37:30 UTC
[GitHub] kaxil closed pull request #3887: [AIRFLOW-3044] Dataflow operators
accept templated job_name param
kaxil closed pull request #3887: [AIRFLOW-3044] Dataflow operators accept templated job_name param
URL: https://github.com/apache/incubator-airflow/pull/3887
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index 95abce632b..4fdb07c74d 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -190,11 +190,9 @@ def get_conn(self):
return build(
'dataflow', 'v1b3', http=http_authorized, cache_discovery=False)
- def _start_dataflow(self, task_id, variables, name,
- command_prefix, label_formatter):
+ def _start_dataflow(self, variables, name, command_prefix, label_formatter):
variables = self._set_variables(variables)
- cmd = command_prefix + self._build_cmd(task_id, variables,
- label_formatter)
+ cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _Dataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
@@ -208,9 +206,9 @@ def _set_variables(variables):
variables['region'] = DEFAULT_DATAFLOW_LOCATION
return variables
- def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
+ def start_java_dataflow(self, job_name, variables, dataflow, job_class=None,
append_job_name=True):
- name = self._build_dataflow_job_name(task_id, append_job_name)
+ name = self._build_dataflow_job_name(job_name, append_job_name)
variables['jobName'] = name
def label_formatter(labels_dict):
@@ -218,48 +216,44 @@ def label_formatter(labels_dict):
json.dumps(labels_dict).replace(' ', ''))]
command_prefix = (["java", "-cp", dataflow, job_class] if job_class
else ["java", "-jar", dataflow])
- self._start_dataflow(task_id, variables, name,
- command_prefix, label_formatter)
+ self._start_dataflow(variables, name, command_prefix, label_formatter)
- def start_template_dataflow(self, task_id, variables, parameters, dataflow_template,
+ def start_template_dataflow(self, job_name, variables, parameters, dataflow_template,
append_job_name=True):
- name = self._build_dataflow_job_name(task_id, append_job_name)
+ name = self._build_dataflow_job_name(job_name, append_job_name)
self._start_template_dataflow(
name, variables, parameters, dataflow_template)
- def start_python_dataflow(self, task_id, variables, dataflow, py_options,
+ def start_python_dataflow(self, job_name, variables, dataflow, py_options,
append_job_name=True):
- name = self._build_dataflow_job_name(task_id, append_job_name)
+ name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
- # TODO: Change python2 to python when Beam supports both python 2 and 3
- # Remember to change the test case too
- self._start_dataflow(task_id, variables, name,
- ["python2"] + py_options + [dataflow],
+ self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)
@staticmethod
- def _build_dataflow_job_name(task_id, append_job_name=True):
- task_id = str(task_id).replace('_', '-')
+ def _build_dataflow_job_name(job_name, append_job_name=True):
+ base_job_name = str(job_name).replace('_', '-')
- if not re.match(r"^[a-z]([-a-z0-9]*[a-z0-9])?$", task_id):
+ if not re.match(r"^[a-z]([-a-z0-9]*[a-z0-9])?$", base_job_name):
raise ValueError(
'Invalid job_name ({}); the name must consist of'
'only the characters [-a-z0-9], starting with a '
- 'letter and ending with a letter or number '.format(task_id))
+ 'letter and ending with a letter or number '.format(base_job_name))
if append_job_name:
- job_name = task_id + "-" + str(uuid.uuid4())[:8]
+ safe_job_name = base_job_name + "-" + str(uuid.uuid4())[:8]
else:
- job_name = task_id
+ safe_job_name = base_job_name
- return job_name
+ return safe_job_name
@staticmethod
- def _build_cmd(task_id, variables, label_formatter):
+ def _build_cmd(variables, label_formatter):
command = ["--runner=DataflowRunner"]
if variables is not None:
for attr, value in variables.items():
diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 1a0d447ef5..7f7a18495d 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -37,8 +37,12 @@ class DataFlowJavaOperator(BaseOperator):
For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- :param jar: The reference to a self executing DataFlow jar.
+ :param jar: The reference to a self executing DataFlow jar (templated).
:type jar: str
+ :param job_name: The 'jobName' to use when executing the DataFlow job
+ (templated). This ends up being set in the pipeline options, so any entry
+ with key ``'jobName'`` in ``options`` will be overwritten.
+ :type job_name: str
:param dataflow_default_options: Map of default job options.
:type dataflow_default_options: dict
:param options: Map of job specific options.
@@ -58,7 +62,7 @@ class DataFlowJavaOperator(BaseOperator):
is often not the main class configured in the dataflow jar file.
:type job_class: str
- Both ``jar`` and ``options`` are templated so you can use variables in them.
+ ``jar``, ``options``, and ``job_name`` are templated so you can use variables in them.
Note that both
``dataflow_default_options`` and ``options`` will be merged to specify pipeline
@@ -100,13 +104,14 @@ class DataFlowJavaOperator(BaseOperator):
dag=my-dag)
"""
- template_fields = ['options', 'jar']
+ template_fields = ['options', 'jar', 'job_name']
ui_color = '#0273d4'
@apply_defaults
def __init__(
self,
jar,
+ job_name='{{task.task_id}}',
dataflow_default_options=None,
options=None,
gcp_conn_id='google_cloud_default',
@@ -124,6 +129,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.jar = jar
+ self.job_name = job_name
self.dataflow_default_options = dataflow_default_options
self.options = options
self.poll_sleep = poll_sleep
@@ -140,7 +146,7 @@ def execute(self, context):
dataflow_options = copy.copy(self.dataflow_default_options)
dataflow_options.update(self.options)
- hook.start_java_dataflow(self.task_id, dataflow_options,
+ hook.start_java_dataflow(self.job_name, dataflow_options,
self.jar, self.job_class)
@@ -151,6 +157,8 @@ class DataflowTemplateOperator(BaseOperator):
:param template: The reference to the DataFlow template.
:type template: str
+ :param job_name: The 'jobName' to use when executing the DataFlow template
+ (templated).
:param dataflow_default_options: Map of default job environment options.
:type dataflow_default_options: dict
:param parameters: Map of job specific parameters for the template.
@@ -201,8 +209,8 @@ class DataflowTemplateOperator(BaseOperator):
gcp_conn_id='gcp-airflow-service-account',
dag=my-dag)
- ``template``, ``dataflow_default_options`` and ``parameters`` are templated so you can
- use variables in them.
+ ``template``, ``dataflow_default_options``, ``parameters``, and ``job_name`` are
+ templated so you can use variables in them.
Note that ``dataflow_default_options`` is expected to save high-level options
for project information, which apply to all dataflow operators in the DAG.
@@ -214,13 +222,14 @@ class DataflowTemplateOperator(BaseOperator):
For more detail on job template execution have a look at the reference:
https://cloud.google.com/dataflow/docs/templates/executing-templates
"""
- template_fields = ['parameters', 'dataflow_default_options', 'template']
+ template_fields = ['parameters', 'dataflow_default_options', 'template', 'job_name']
ui_color = '#0273d4'
@apply_defaults
def __init__(
self,
template,
+ job_name='{{task.task_id}}',
dataflow_default_options=None,
parameters=None,
gcp_conn_id='google_cloud_default',
@@ -238,6 +247,7 @@ def __init__(
self.dataflow_default_options = dataflow_default_options
self.poll_sleep = poll_sleep
self.template = template
+ self.job_name = job_name
self.parameters = parameters
def execute(self, context):
@@ -245,7 +255,7 @@ def execute(self, context):
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
- hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
+ hook.start_template_dataflow(self.job_name, self.dataflow_default_options,
self.parameters, self.template)
@@ -264,6 +274,10 @@ class DataFlowPythonOperator(BaseOperator):
:param py_file: Reference to the python dataflow pipleline file.py, e.g.,
/some/local/file/path/to/your/python/pipeline/file.
:type py_file: str
+ :param job_name: The 'job_name' to use when executing the DataFlow job
+ (templated). This ends up being set in the pipeline options, so any entry
+ with key ``'jobName'`` or ``'job_name'`` in ``options`` will be overwritten.
+ :type job_name: str
:param py_options: Additional python options.
:type pyt_options: list of strings, e.g., ["-m", "-v"].
:param dataflow_default_options: Map of default job options.
@@ -282,12 +296,13 @@ class DataFlowPythonOperator(BaseOperator):
JOB_STATE_RUNNING state.
:type poll_sleep: int
"""
- template_fields = ['options', 'dataflow_default_options']
+ template_fields = ['options', 'dataflow_default_options', 'job_name']
@apply_defaults
def __init__(
self,
py_file,
+ job_name='{{task.task_id}}',
py_options=None,
dataflow_default_options=None,
options=None,
@@ -300,6 +315,7 @@ def __init__(
super(DataFlowPythonOperator, self).__init__(*args, **kwargs)
self.py_file = py_file
+ self.job_name = job_name
self.py_options = py_options or []
self.dataflow_default_options = dataflow_default_options or {}
self.options = options or {}
@@ -325,7 +341,7 @@ def execute(self, context):
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
- self.task_id, formatted_options,
+ self.job_name, formatted_options,
self.py_file, self.py_options)
diff --git a/tests/contrib/hooks/test_gcp_dataflow_hook.py b/tests/contrib/hooks/test_gcp_dataflow_hook.py
index 27a1f4764b..c18e8df6f3 100644
--- a/tests/contrib/hooks/test_gcp_dataflow_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataflow_hook.py
@@ -35,6 +35,7 @@
TASK_ID = 'test-dataflow-operator'
+JOB_NAME = 'test-dataflow-pipeline'
TEMPLATE = 'gs://dataflow-templates/wordcount/template_file'
PARAMETERS = {
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
@@ -92,14 +93,14 @@ def test_start_python_dataflow(self, mock_conn,
dataflowjob_instance = mock_dataflowjob.return_value
dataflowjob_instance.wait_for_done.return_value = None
self.dataflow_hook.start_python_dataflow(
- task_id=TASK_ID, variables=DATAFLOW_OPTIONS_PY,
+ job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_PY,
dataflow=PY_FILE, py_options=PY_OPTIONS)
EXPECTED_CMD = ['python2', '-m', PY_FILE,
'--region=us-central1',
'--runner=DataflowRunner', '--project=test',
'--labels=foo=bar',
'--staging_location=gs://test/staging',
- '--job_name={}-{}'.format(TASK_ID, MOCK_UUID)]
+ '--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)]
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(EXPECTED_CMD))
@@ -116,14 +117,14 @@ def test_start_java_dataflow(self, mock_conn,
dataflowjob_instance = mock_dataflowjob.return_value
dataflowjob_instance.wait_for_done.return_value = None
self.dataflow_hook.start_java_dataflow(
- task_id=TASK_ID, variables=DATAFLOW_OPTIONS_JAVA,
+ job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_JAVA,
dataflow=JAR_FILE)
EXPECTED_CMD = ['java', '-jar', JAR_FILE,
'--region=us-central1',
'--runner=DataflowRunner', '--project=test',
'--stagingLocation=gs://test/staging',
'--labels={"foo":"bar"}',
- '--jobName={}-{}'.format(TASK_ID, MOCK_UUID)]
+ '--jobName={}-{}'.format(JOB_NAME, MOCK_UUID)]
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(EXPECTED_CMD))
@@ -140,18 +141,17 @@ def test_start_java_dataflow_with_job_class(
dataflowjob_instance = mock_dataflowjob.return_value
dataflowjob_instance.wait_for_done.return_value = None
self.dataflow_hook.start_java_dataflow(
- task_id=TASK_ID, variables=DATAFLOW_OPTIONS_JAVA,
+ job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_JAVA,
dataflow=JAR_FILE, job_class=JOB_CLASS)
EXPECTED_CMD = ['java', '-cp', JAR_FILE, JOB_CLASS,
'--region=us-central1',
'--runner=DataflowRunner', '--project=test',
'--stagingLocation=gs://test/staging',
'--labels={"foo":"bar"}',
- '--jobName={}-{}'.format(TASK_ID, MOCK_UUID)]
+ '--jobName={}-{}'.format(JOB_NAME, MOCK_UUID)]
self.assertListEqual(sorted(mock_dataflow.call_args[0][0]),
sorted(EXPECTED_CMD))
-
@mock.patch('airflow.contrib.hooks.gcp_dataflow_hook._Dataflow.log')
@mock.patch('subprocess.Popen')
@mock.patch('select.select')
@@ -178,18 +178,18 @@ def poll_resp_error():
def test_valid_dataflow_job_name(self):
job_name = self.dataflow_hook._build_dataflow_job_name(
- task_id=TASK_ID, append_job_name=False
+ job_name=JOB_NAME, append_job_name=False
)
- self.assertEquals(job_name, TASK_ID)
+ self.assertEquals(job_name, JOB_NAME)
- def test_fix_underscore_in_task_id(self):
- task_id_with_underscore = 'test_example'
- fixed_job_name = task_id_with_underscore.replace(
+ def test_fix_underscore_in_job_name(self):
+ job_name_with_underscore = 'test_example'
+ fixed_job_name = job_name_with_underscore.replace(
'_', '-'
)
job_name = self.dataflow_hook._build_dataflow_job_name(
- task_id=task_id_with_underscore, append_job_name=False
+ job_name=job_name_with_underscore, append_job_name=False
)
self.assertEquals(job_name, fixed_job_name)
@@ -201,7 +201,7 @@ def test_invalid_dataflow_job_name(self):
with self.assertRaises(ValueError) as e:
self.dataflow_hook._build_dataflow_job_name(
- task_id=invalid_job_name, append_job_name=False
+ job_name=invalid_job_name, append_job_name=False
)
# Test whether the job_name is present in the Error msg
self.assertIn('Invalid job_name ({})'.format(fixed_name),
@@ -210,37 +210,37 @@ def test_invalid_dataflow_job_name(self):
def test_dataflow_job_regex_check(self):
self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
- task_id='df-job-1', append_job_name=False
+ job_name='df-job-1', append_job_name=False
), 'df-job-1')
self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
- task_id='df-job', append_job_name=False
+ job_name='df-job', append_job_name=False
), 'df-job')
self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
- task_id='dfjob', append_job_name=False
+ job_name='dfjob', append_job_name=False
), 'dfjob')
self.assertEquals(self.dataflow_hook._build_dataflow_job_name(
- task_id='dfjob1', append_job_name=False
+ job_name='dfjob1', append_job_name=False
), 'dfjob1')
self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
- task_id='1dfjob', append_job_name=False
+ job_name='1dfjob', append_job_name=False
)
self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
- task_id='dfjob@', append_job_name=False
+ job_name='dfjob@', append_job_name=False
)
self.assertRaises(
ValueError,
self.dataflow_hook._build_dataflow_job_name,
- task_id='df^jo', append_job_name=False
+ job_name='df^jo', append_job_name=False
)
@@ -254,7 +254,7 @@ def setUp(self):
@mock.patch(DATAFLOW_STRING.format('DataFlowHook._start_template_dataflow'))
def test_start_template_dataflow(self, internal_dataflow_mock):
self.dataflow_hook.start_template_dataflow(
- task_id=TASK_ID, variables=DATAFLOW_OPTIONS_TEMPLATE, parameters=PARAMETERS,
+ job_name=JOB_NAME, variables=DATAFLOW_OPTIONS_TEMPLATE, parameters=PARAMETERS,
dataflow_template=TEMPLATE)
internal_dataflow_mock.assert_called_once_with(
mock.ANY, DATAFLOW_OPTIONS_TEMPLATE, PARAMETERS, TEMPLATE)
diff --git a/tests/contrib/operators/test_dataflow_operator.py b/tests/contrib/operators/test_dataflow_operator.py
index 8ed4583255..b951503eb5 100644
--- a/tests/contrib/operators/test_dataflow_operator.py
+++ b/tests/contrib/operators/test_dataflow_operator.py
@@ -36,6 +36,7 @@
TASK_ID = 'test-dataflow-operator'
+JOB_NAME = 'test-dataflow-pipeline'
TEMPLATE = 'gs://dataflow-templates/wordcount/template_file'
PARAMETERS = {
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
@@ -74,6 +75,7 @@ def setUp(self):
self.dataflow = DataFlowPythonOperator(
task_id=TASK_ID,
py_file=PY_FILE,
+ job_name=JOB_NAME,
py_options=PY_OPTIONS,
dataflow_default_options=DEFAULT_OPTIONS_PYTHON,
options=ADDITIONAL_OPTIONS,
@@ -82,6 +84,7 @@ def setUp(self):
def test_init(self):
"""Test DataFlowPythonOperator instance is properly initialized."""
self.assertEqual(self.dataflow.task_id, TASK_ID)
+ self.assertEqual(self.dataflow.job_name, JOB_NAME)
self.assertEqual(self.dataflow.py_file, PY_FILE)
self.assertEqual(self.dataflow.py_options, PY_OPTIONS)
self.assertEqual(self.dataflow.poll_sleep, POLL_SLEEP)
@@ -108,8 +111,8 @@ def test_exec(self, gcs_hook, dataflow_mock):
'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION}
}
gcs_download_hook.assert_called_once_with(PY_FILE)
- start_python_hook.assert_called_once_with(TASK_ID, expected_options,
- mock.ANY, PY_OPTIONS)
+ start_python_hook.assert_called_once_with(JOB_NAME, expected_options, mock.ANY,
+ PY_OPTIONS)
self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
@@ -119,6 +122,7 @@ def setUp(self):
self.dataflow = DataFlowJavaOperator(
task_id=TASK_ID,
jar=JAR_FILE,
+ job_name=JOB_NAME,
job_class=JOB_CLASS,
dataflow_default_options=DEFAULT_OPTIONS_JAVA,
options=ADDITIONAL_OPTIONS,
@@ -127,6 +131,7 @@ def setUp(self):
def test_init(self):
"""Test DataflowTemplateOperator instance is properly initialized."""
self.assertEqual(self.dataflow.task_id, TASK_ID)
+ self.assertEqual(self.dataflow.job_name, JOB_NAME)
self.assertEqual(self.dataflow.poll_sleep, POLL_SLEEP)
self.assertEqual(self.dataflow.dataflow_default_options,
DEFAULT_OPTIONS_JAVA)
@@ -147,7 +152,7 @@ def test_exec(self, gcs_hook, dataflow_mock):
self.dataflow.execute(None)
self.assertTrue(dataflow_mock.called)
gcs_download_hook.assert_called_once_with(JAR_FILE)
- start_java_hook.assert_called_once_with(TASK_ID, mock.ANY,
+ start_java_hook.assert_called_once_with(JOB_NAME, mock.ANY,
mock.ANY, JOB_CLASS)
@@ -157,6 +162,7 @@ def setUp(self):
self.dataflow = DataflowTemplateOperator(
task_id=TASK_ID,
template=TEMPLATE,
+ job_name=JOB_NAME,
parameters=PARAMETERS,
dataflow_default_options=DEFAULT_OPTIONS_TEMPLATE,
poll_sleep=POLL_SLEEP)
@@ -164,6 +170,7 @@ def setUp(self):
def test_init(self):
"""Test DataflowTemplateOperator instance is properly initialized."""
self.assertEqual(self.dataflow.task_id, TASK_ID)
+ self.assertEqual(self.dataflow.job_name, JOB_NAME)
self.assertEqual(self.dataflow.template, TEMPLATE)
self.assertEqual(self.dataflow.parameters, PARAMETERS)
self.assertEqual(self.dataflow.poll_sleep, POLL_SLEEP)
@@ -185,7 +192,7 @@ def test_exec(self, dataflow_mock):
'tempLocation': 'gs://test/temp',
'zone': 'us-central1-f'
}
- start_template_hook.assert_called_once_with(TASK_ID, expected_options,
+ start_template_hook.assert_called_once_with(JOB_NAME, expected_options,
PARAMETERS, TEMPLATE)
diff --git a/tests/contrib/operators/test_mlengine_operator_utils.py b/tests/contrib/operators/test_mlengine_operator_utils.py
index 0cb106da6b..09f0071e21 100644
--- a/tests/contrib/operators/test_mlengine_operator_utils.py
+++ b/tests/contrib/operators/test_mlengine_operator_utils.py
@@ -114,7 +114,7 @@ def testSuccessfulRun(self):
mock_dataflow_hook.assert_called_with(
gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10)
hook_instance.start_python_dataflow.assert_called_once_with(
- 'eval-test-summary',
+ '{{task.task_id}}',
{
'prediction_path': 'gs://legal-bucket/fake-output-path',
'labels': {'airflow-version': TEST_VERSION},
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services