You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ben Storrie <br...@gmail.com> on 2016/07/25 22:06:38 UTC

Custom Operator Issues

Hello,

Not sure if this is the correct place to ask, but I couldn't find anywhere
better to ask.  I'm trying to create a custom Spark Operator, that, at the
moment, will basically accomplish the same as a BashOperator, but with some
additional features. Eventually it will not be a duplicate, but I cannot
get it working as is. Should this be done as a plugin, rather than a custom
operator that inherits from BaseOperator?

I've attached the custom spark operator, and the dag file for review, as
they are too large for this. The exception I receive when attempting to run
the dag is the following:

[2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbb9fbb6e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=False, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='/opt/spotx-hadoop-airflow/dags',
task_id='run')
Sending to executor.
[2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
LocalExecutor
Namespace(dag_id='spark_operator_2', execution_date=datetime.datetime(2016,
7, 25, 0, 0), force=False, func=<function run at 0x7fbd7b2f7e60>,
ignore_dependencies=False, ignore_depends_on_past=False, job_id=None,
local=True, mark_success=False, pickle=None, pool=None, raw=False,
ship_dag=False, subcommand='run', subdir='DAGS_FOLDER/spark_test.py',
task_id='run')
Traceback (most recent call last):
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
line 16, in <module>
    args.func(args)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 206, in run
    dag = get_dag(args)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
line 73, in get_dag
    dagbag = DagBag(process_subdir(args.subdir))
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 166, in __init__
    self.collect_dags(dag_folder)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 385, in collect_dags
    self.process_file(dag_folder, only_if_updated=only_if_updated)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 292, in process_file
    self.bag_dag(dag, parent_dag=dag, root_dag=dag)
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 341, in bag_dag
    dag.resolve_template_files()
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2715, in resolve_template_files
    t.resolve_template_files()
  File
"/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
line 2033, in resolve_template_files
    content = getattr(self, attr)
AttributeError: 'SparkOperator' object has no attribute 's'

Many thanks,
Ben

Re: Custom Operator Issues

Posted by Maxime Beauchemin <ma...@gmail.com>.
`template_fields` should be a proper iterable (list or tuple). Change
`template_fields = 'spark_jar'` to `template_fields = ('spark_jar',)`

Max

On Mon, Jul 25, 2016 at 3:06 PM, Ben Storrie <br...@gmail.com> wrote:

> Hello,
>
> Not sure if this is the correct place to ask, but I couldn't find anywhere
> better to ask.  I'm trying to create a custom Spark Operator, that, at the
> moment, will basically accomplish the same as a BashOperator, but with some
> additional features. Eventually it will not be a duplicate, but I cannot
> get it working as is. Should this be done as a plugin, rather than a custom
> operator that inherits from BaseOperator?
>
> I've attached the custom spark operator, and the dag file for review, as
> they are too large for this. The exception I receive when attempting to run
> the dag is the following:
>
> [2016-07-25 21:53:24,302] {__init__.py:36} INFO - Using executor
> LocalExecutor
> Namespace(dag_id='spark_operator_2',
> execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False,
> func=<function run at 0x7fbb9fbb6e60>, ignore_dependencies=False,
> ignore_depends_on_past=False, job_id=None, local=False, mark_success=False,
> pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run',
> subdir='/opt/spotx-hadoop-airflow/dags', task_id='run')
> Sending to executor.
> [2016-07-25 21:53:24,983] {__init__.py:36} INFO - Using executor
> LocalExecutor
> Namespace(dag_id='spark_operator_2',
> execution_date=datetime.datetime(2016, 7, 25, 0, 0), force=False,
> func=<function run at 0x7fbd7b2f7e60>, ignore_dependencies=False,
> ignore_depends_on_past=False, job_id=None, local=True, mark_success=False,
> pickle=None, pool=None, raw=False, ship_dag=False, subcommand='run',
> subdir='DAGS_FOLDER/spark_test.py', task_id='run')
> Traceback (most recent call last):
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/bin/airflow",
> line 16, in <module>
>     args.func(args)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 206, in run
>     dag = get_dag(args)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/bin/cli.py",
> line 73, in get_dag
>     dagbag = DagBag(process_subdir(args.subdir))
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 166, in __init__
>     self.collect_dags(dag_folder)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 385, in collect_dags
>     self.process_file(dag_folder, only_if_updated=only_if_updated)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 292, in process_file
>     self.bag_dag(dag, parent_dag=dag, root_dag=dag)
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 341, in bag_dag
>     dag.resolve_template_files()
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 2715, in resolve_template_files
>     t.resolve_template_files()
>   File
> "/opt/spotx-miniconda/miniconda/envs/spotx-hadoop-airflow/lib/python2.7/site-packages/airflow/models.py",
> line 2033, in resolve_template_files
>     content = getattr(self, attr)
> AttributeError: 'SparkOperator' object has no attribute 's'
>
> Many thanks,
> Ben
>