You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ben Storrie <br...@gmail.com> on 2016/07/25 22:06:38 UTC
Custom Operator Issues
Hello,
Not sure if this is the correct place to ask, but I couldn't find anywhere
better to ask. I'm trying to create a custom Spark Operator, that, at the
moment, will basically accomplish the same as a BashOperator, but with some
additional features. Eventually it will not be a duplicate, but I cannot
get it working as is. Should this be done as a plugin, rather than a custom
operator that inherits from BaseOperator?
I've attached the custom spark operator, and the dag file for review, as
they are too large for this. The exception I receive when attempting to run
the dag is the following:
[2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbb9fbb6e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=False, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='/opt/spotx-hadoop-airflow/dags',
task_id='run')
Sending to executor.
[2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbd7b2f7e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=True, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='DAGS_FOLDER/spark_test.py',
task_id='run')
Traceback (most recent call last):
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
line 16, in <module>
args.func(args)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 206, in run
dag = get_dag(args)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 73, in get_dag
dagbag = DagBag(process_subdir(args.subdir))
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 166, in __init__
self.collect_dags(dag_folder)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 385, in collect_dags
self.process_file(dag_folder, only_if_updated=only_if_updated)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 292, in process_file
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 341, in bag_dag
dag.resolve_template_files()
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2715, in resolve_template_files
t.resolve_template_files()
File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2033, in resolve_template_files
content = getattr(self, attr)
AttributeError: 'SparkOperator' object has no attribute 's'
Many thanks,
Ben
Re: Custom Operator Issues
Posted by Maxime Beauchemin <ma...@gmail.com>.
`template_fields` should be a proper iterable (list or tuple). Change
`template_fields = 'spark_jar'` to `template_fields = ('spark_jar',)`
Max
On Mon, Jul 25, 2016 at 3:06 PM, Ben Storrie <br...@gmail.com> wrote:
> Hello,
>
> Not sure if this is the correct place to ask, but I couldn't find anywhere
> better to ask. I'm trying to create a custom Spark Operator, that, at the
> moment, will basically accomplish the same as a BashOperator, but with some
> additional features. Eventually it will not be a duplicate, but I cannot
> get it working as is. Should this be done as a plugin, rather than a custom
> operator that inherits from BaseOperator?
>
> I've attached the custom spark operator, and the dag file for review, as
> they are too large for this. The exception I receive when attempting to run
> the dag is the following:
>
> [2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
> LocalExecutor
> Namespace(dag_id='spark_operator_2',
> execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False,
> func=<function run at 0x7fbb9fbb6e60>, ignore_dependencies=False,
> ignore_depends_on_past=False, job_id=None, local=False, mark_success=False,
> pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run',
> subdir='/opt/spotx-hadoop-airflow/dags', task_id='run')
> Sending to executor.
> [2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
> LocalExecutor
> Namespace(dag_id='spark_operator_2',
> execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False,
> func=<function run at 0x7fbd7b2f7e60>, ignore_dependencies=False,
> ignore_depends_on_past=False, job_id=None, local=True, mark_success=False,
> pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run',
> subdir='DAGS_FOLDER/spark_test.py', task_id='run')
> Traceback (most recent call last):
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
> line 16, in <module>
> args.func(args)
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 206, in run
> dag = get_dag(args)
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 73, in get_dag
> dagbag = DagBag(process_subdir(args.subdir))
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 166, in __init__
> self.collect_dags(dag_folder)
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 385, in collect_dags
> self.process_file(dag_folder, only_if_updated=only_if_updated)
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 292, in process_file
> self.bag_dag(dag, parent_dag=dag, root_dag=dag)
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 341, in bag_dag
> dag.resolve_template_files()
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 2715, in resolve_template_files
> t.resolve_template_files()
> File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 2033, in resolve_template_files
> content = getattr(self, attr)
> AttributeError: 'SparkOperator' object has no attribute 's'
>
> Many thanks,
> Ben
>