You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Anton Mushin <An...@epam.com> on 2018/05/07 04:54:11 UTC

RE: Problem with SparkSubmit

Hi,  Fokko
Thanks for your reply.

I use version 1.9.0

-----Original Message-----
From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of Driesprong, Fokko
Sent: Saturday, April 28, 2018 10:54 PM
To: dev@airflow.incubator.apache.org
Subject: Re: Problem with SparkSubmit

Hi Anton,

Which version of Airflow are you running?

Cheers, Fokko

2018-04-27 10:24 GMT+02:00 Anton Mushin <An...@epam.com>:

> Hi all,
> I have problem with spark operator. I get exception
>
> user@host:/# airflow test myDAG myTask 2018-04-26
> [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar 
> tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar 
> tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor 
> SequentialExecutor
> [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag 
> from /usr/local/airflow/dags
> [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> sparkhost
> Traceback (most recent call last):
>   File "/usr/local/bin/airflow", line 27, in <module>
>     args.func(args)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", 
> line 528, in test
>     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", 
> line 50, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", 
> line 1584, in run
>     session=session)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", 
> line 50, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", 
> line 1493, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> operators/spark_submit_operator.py", line 145, in execute
>     self._hook.submit(self._application)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_su
> bmit_hook.py",
> line 231, in submit
>     **kwargs)
>   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
>     restore_signals, start_new_session)
>   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
>     restore_signals, start_new_session, preexec_fn)
> TypeError: Can't convert 'list' object to str implicitly
>
> My DAG look like:
>
> from airflow import DAG
> from datetime import datetime, timedelta, date from 
> airflow.contrib.operators.spark_submit_operator import 
> SparkSubmitOperator
>
> default_args = {
>     'owner': 'spark',
>     'depends_on_past': False,
>     'start_date': datetime.now(),
>     'retries': 1,
>     'retry_delay': timedelta(minutes=1) }
>
> dag = DAG('myDAG', default_args=default_args,)
>
> connection_id = "SPARK"
> os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077'
>
> _config = {
>     'jars': 'spark_job.jar',
>     'executor_memory': '2g',
>     'name': 'myJob',
>     'conn_id': connection_id,
>     'java_class':'org.Job'
> }
>
> operator = SparkSubmitOperator(
>     task_id='myTask',
>     dag=dag,
>     **_config
> )
>
> What is wrong? Could somebody help me wit it?
>
>

Re: Problem with SparkSubmit

Posted by Arthur Wiedmer <ar...@gmail.com>.
No excuse necessary, we are glad you found the issue!

Best,
Arthur

On Tue, May 8, 2018 at 8:06 AM Anton Mushin <An...@epam.com> wrote:

> I found mistake in my connection configuration.
> The problem is not relevant. Excuse me for troubling.
>
> Best Regards,
> Anton
>
> -----Original Message-----
> From: Anton Mushin <An...@epam.com>
> Sent: Tuesday, May 08, 2018 6:47 PM
> To: dev@airflow.incubator.apache.org
> Subject: RE: Problem with SparkSubmit
>
> Hi, Fokko
> Thanks for you help.
>
> I got new error:
> ERROR - [Errno 2] No such file or directory: 'spark-submit'
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line
> 1493, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File
> "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py",
> line 145, in execute
>     self._hook.submit(self._application)
>   File
> "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py",
> line 231, in submit
>     **kwargs)
>   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
>     restore_signals, start_new_session)
>   File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child
>     raise child_exception_type(errno_num, err_msg)
>
> In my case spark-submit isn't added to PATH and I can't do it. I can't
> find information how I should configure spark submit operator for this case.
> Could you help me? Should I set some os.environ for define path to
> spark-submit script?
>
> Best Regards,
> Anton
>
>
> -----Original Message-----
> From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of
> Driesprong, Fokko
> Sent: Monday, May 07, 2018 9:21 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> I see the issue now. You're passing the to the jars argument. But this is
> actually for additional jars that need to be passed on to the spark
> classpath. For example additional jars that provide UDF's. You need to pass
> the jar to the application argument.
>
> _config = {
>     'application': 'spark_job.jar',
>     'executor_memory': '2g',
>     'name': 'myJob',
>     'conn_id': connection_id,
>     'java_class':'org.Job'
> }
>
> operator = SparkSubmitOperator(
>     task_id='myTask',
>     dag=dag,
>     **_config
> )
>
> Hope this helps.
>
> Cheers, Fokko
>
> 2018-05-07 6:54 GMT+02:00 Anton Mushin <An...@epam.com>:
>
> > Hi,  Fokko
> > Thanks for your reply.
> >
> > I use version 1.9.0
> >
> > -----Original Message-----
> > From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of
> > Driesprong, Fokko
> > Sent: Saturday, April 28, 2018 10:54 PM
> > To: dev@airflow.incubator.apache.org
> > Subject: Re: Problem with SparkSubmit
> >
> > Hi Anton,
> >
> > Which version of Airflow are you running?
> >
> > Cheers, Fokko
> >
> > 2018-04-27 10:24 GMT+02:00 Anton Mushin <An...@epam.com>:
> >
> > > Hi all,
> > > I have problem with spark operator. I get exception
> > >
> > > user@host:/# airflow test myDAG myTask 2018-04-26
> > > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar
> > > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar
> > > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor
> > > SequentialExecutor
> > > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the
> > > DagBag from /usr/local/airflow/dags
> > > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > > sparkhost
> > > Traceback (most recent call last):
> > >   File "/usr/local/bin/airflow", line 27, in <module>
> > >     args.func(args)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > > line 528, in test
> > >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > > line 50, in wrapper
> > >     result = func(*args, **kwargs)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > > line 1584, in run
> > >     session=session)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > > line 50, in wrapper
> > >     result = func(*args, **kwargs)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > > line 1493, in _run_raw_task
> > >     result = task_copy.execute(context=context)
> > >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > > operators/spark_submit_operator.py", line 145, in execute
> > >     self._hook.submit(self._application)
> > >   File
> > > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_
> > > su
> > > bmit_hook.py",
> > > line 231, in submit
> > >     **kwargs)
> > >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> > >     restore_signals, start_new_session)
> > >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> > >     restore_signals, start_new_session, preexec_fn)
> > > TypeError: Can't convert 'list' object to str implicitly
> > >
> > > My DAG look like:
> > >
> > > from airflow import DAG
> > > from datetime import datetime, timedelta, date from
> > > airflow.contrib.operators.spark_submit_operator import
> > > SparkSubmitOperator
> > >
> > > default_args = {
> > >     'owner': 'spark',
> > >     'depends_on_past': False,
> > >     'start_date': datetime.now(),
> > >     'retries': 1,
> > >     'retry_delay': timedelta(minutes=1) }
> > >
> > > dag = DAG('myDAG', default_args=default_args,)
> > >
> > > connection_id = "SPARK"
> > > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> > 'spark://sparkhost:7077'
> > >
> > > _config = {
> > >     'jars': 'spark_job.jar',
> > >     'executor_memory': '2g',
> > >     'name': 'myJob',
> > >     'conn_id': connection_id,
> > >     'java_class':'org.Job'
> > > }
> > >
> > > operator = SparkSubmitOperator(
> > >     task_id='myTask',
> > >     dag=dag,
> > >     **_config
> > > )
> > >
> > > What is wrong? Could somebody help me wit it?
> > >
> > >
> >
>

RE: Problem with SparkSubmit

Posted by Anton Mushin <An...@epam.com>.
I found mistake in my connection configuration.
The problem is not relevant. Excuse me for troubling.

Best Regards,
Anton

-----Original Message-----
From: Anton Mushin <An...@epam.com> 
Sent: Tuesday, May 08, 2018 6:47 PM
To: dev@airflow.incubator.apache.org
Subject: RE: Problem with SparkSubmit

Hi, Fokko
Thanks for you help.

I got new error:
ERROR - [Errno 2] No such file or directory: 'spark-submit'
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 145, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 231, in submit
    **kwargs)
  File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
    restore_signals, start_new_session)
  File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg)

In my case spark-submit isn't added to PATH and I can't do it. I can't find information how I should configure spark submit operator for this case.
Could you help me? Should I set some os.environ for define path to spark-submit script?

Best Regards,
Anton


-----Original Message-----
From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of Driesprong, Fokko
Sent: Monday, May 07, 2018 9:21 PM
To: dev@airflow.incubator.apache.org
Subject: Re: Problem with SparkSubmit

Hi Anton,

I see the issue now. You're passing the to the jars argument. But this is actually for additional jars that need to be passed on to the spark classpath. For example additional jars that provide UDF's. You need to pass the jar to the application argument.

_config = {
    'application': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,
    'java_class':'org.Job'
}

operator = SparkSubmitOperator(
    task_id='myTask',
    dag=dag,
    **_config
)

Hope this helps.

Cheers, Fokko

2018-05-07 6:54 GMT+02:00 Anton Mushin <An...@epam.com>:

> Hi,  Fokko
> Thanks for your reply.
>
> I use version 1.9.0
>
> -----Original Message-----
> From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of 
> Driesprong, Fokko
> Sent: Saturday, April 28, 2018 10:54 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> Which version of Airflow are you running?
>
> Cheers, Fokko
>
> 2018-04-27 10:24 GMT+02:00 Anton Mushin <An...@epam.com>:
>
> > Hi all,
> > I have problem with spark operator. I get exception
> >
> > user@host:/# airflow test myDAG myTask 2018-04-26
> > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar 
> > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar 
> > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor 
> > SequentialExecutor
> > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the 
> > DagBag from /usr/local/airflow/dags
> > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > sparkhost
> > Traceback (most recent call last):
> >   File "/usr/local/bin/airflow", line 27, in <module>
> >     args.func(args)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > line 528, in test
> >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1584, in run
> >     session=session)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1493, in _run_raw_task
> >     result = task_copy.execute(context=context)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > operators/spark_submit_operator.py", line 145, in execute
> >     self._hook.submit(self._application)
> >   File
> > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_
> > su
> > bmit_hook.py",
> > line 231, in submit
> >     **kwargs)
> >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> >     restore_signals, start_new_session)
> >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> >     restore_signals, start_new_session, preexec_fn)
> > TypeError: Can't convert 'list' object to str implicitly
> >
> > My DAG look like:
> >
> > from airflow import DAG
> > from datetime import datetime, timedelta, date from 
> > airflow.contrib.operators.spark_submit_operator import 
> > SparkSubmitOperator
> >
> > default_args = {
> >     'owner': 'spark',
> >     'depends_on_past': False,
> >     'start_date': datetime.now(),
> >     'retries': 1,
> >     'retry_delay': timedelta(minutes=1) }
> >
> > dag = DAG('myDAG', default_args=default_args,)
> >
> > connection_id = "SPARK"
> > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> 'spark://sparkhost:7077'
> >
> > _config = {
> >     'jars': 'spark_job.jar',
> >     'executor_memory': '2g',
> >     'name': 'myJob',
> >     'conn_id': connection_id,
> >     'java_class':'org.Job'
> > }
> >
> > operator = SparkSubmitOperator(
> >     task_id='myTask',
> >     dag=dag,
> >     **_config
> > )
> >
> > What is wrong? Could somebody help me wit it?
> >
> >
>

RE: Problem with SparkSubmit

Posted by Anton Mushin <An...@epam.com>.
Hi, Fokko
Thanks for you help.

I got new error:
ERROR - [Errno 2] No such file or directory: 'spark-submit'
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 145, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 231, in submit
    **kwargs)
  File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
    restore_signals, start_new_session)
  File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg)

In my case spark-submit isn't added to PATH and I can't do it. I can't find information how I should configure spark submit operator for this case.
Could you help me? Should I set some os.environ for define path to spark-submit script?

Best Regards,
Anton


-----Original Message-----
From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of Driesprong, Fokko
Sent: Monday, May 07, 2018 9:21 PM
To: dev@airflow.incubator.apache.org
Subject: Re: Problem with SparkSubmit

Hi Anton,

I see the issue now. You're passing the to the jars argument. But this is actually for additional jars that need to be passed on to the spark classpath. For example additional jars that provide UDF's. You need to pass the jar to the application argument.

_config = {
    'application': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,
    'java_class':'org.Job'
}

operator = SparkSubmitOperator(
    task_id='myTask',
    dag=dag,
    **_config
)

Hope this helps.

Cheers, Fokko

2018-05-07 6:54 GMT+02:00 Anton Mushin <An...@epam.com>:

> Hi,  Fokko
> Thanks for your reply.
>
> I use version 1.9.0
>
> -----Original Message-----
> From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of 
> Driesprong, Fokko
> Sent: Saturday, April 28, 2018 10:54 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> Which version of Airflow are you running?
>
> Cheers, Fokko
>
> 2018-04-27 10:24 GMT+02:00 Anton Mushin <An...@epam.com>:
>
> > Hi all,
> > I have problem with spark operator. I get exception
> >
> > user@host:/# airflow test myDAG myTask 2018-04-26
> > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar 
> > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar 
> > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor 
> > SequentialExecutor
> > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the 
> > DagBag from /usr/local/airflow/dags
> > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > sparkhost
> > Traceback (most recent call last):
> >   File "/usr/local/bin/airflow", line 27, in <module>
> >     args.func(args)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > line 528, in test
> >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1584, in run
> >     session=session)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1493, in _run_raw_task
> >     result = task_copy.execute(context=context)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > operators/spark_submit_operator.py", line 145, in execute
> >     self._hook.submit(self._application)
> >   File
> > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_
> > su
> > bmit_hook.py",
> > line 231, in submit
> >     **kwargs)
> >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> >     restore_signals, start_new_session)
> >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> >     restore_signals, start_new_session, preexec_fn)
> > TypeError: Can't convert 'list' object to str implicitly
> >
> > My DAG look like:
> >
> > from airflow import DAG
> > from datetime import datetime, timedelta, date from 
> > airflow.contrib.operators.spark_submit_operator import 
> > SparkSubmitOperator
> >
> > default_args = {
> >     'owner': 'spark',
> >     'depends_on_past': False,
> >     'start_date': datetime.now(),
> >     'retries': 1,
> >     'retry_delay': timedelta(minutes=1) }
> >
> > dag = DAG('myDAG', default_args=default_args,)
> >
> > connection_id = "SPARK"
> > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> 'spark://sparkhost:7077'
> >
> > _config = {
> >     'jars': 'spark_job.jar',
> >     'executor_memory': '2g',
> >     'name': 'myJob',
> >     'conn_id': connection_id,
> >     'java_class':'org.Job'
> > }
> >
> > operator = SparkSubmitOperator(
> >     task_id='myTask',
> >     dag=dag,
> >     **_config
> > )
> >
> > What is wrong? Could somebody help me wit it?
> >
> >
>

Re: Problem with SparkSubmit

Posted by "Driesprong, Fokko" <fo...@driesprong.frl>.
Hi Anton,

I see the issue now. You're passing the to the jars argument. But this is
actually for additional jars that need to be passed on to the spark
classpath. For example additional jars that provide UDF's. You need to pass
the jar to the application argument.

_config = {
    'application': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,
    'java_class':'org.Job'
}

operator = SparkSubmitOperator(
    task_id='myTask',
    dag=dag,
    **_config
)

Hope this helps.

Cheers, Fokko

2018-05-07 6:54 GMT+02:00 Anton Mushin <An...@epam.com>:

> Hi,  Fokko
> Thanks for your reply.
>
> I use version 1.9.0
>
> -----Original Message-----
> From: fokko@driesprongen.nl <fo...@driesprongen.nl> On Behalf Of
> Driesprong, Fokko
> Sent: Saturday, April 28, 2018 10:54 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> Which version of Airflow are you running?
>
> Cheers, Fokko
>
> 2018-04-27 10:24 GMT+02:00 Anton Mushin <An...@epam.com>:
>
> > Hi all,
> > I have problem with spark operator. I get exception
> >
> > user@host:/# airflow test myDAG myTask 2018-04-26
> > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar
> > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar
> > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor
> > SequentialExecutor
> > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag
> > from /usr/local/airflow/dags
> > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > sparkhost
> > Traceback (most recent call last):
> >   File "/usr/local/bin/airflow", line 27, in <module>
> >     args.func(args)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > line 528, in test
> >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1584, in run
> >     session=session)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1493, in _run_raw_task
> >     result = task_copy.execute(context=context)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > operators/spark_submit_operator.py", line 145, in execute
> >     self._hook.submit(self._application)
> >   File
> > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_su
> > bmit_hook.py",
> > line 231, in submit
> >     **kwargs)
> >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> >     restore_signals, start_new_session)
> >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> >     restore_signals, start_new_session, preexec_fn)
> > TypeError: Can't convert 'list' object to str implicitly
> >
> > My DAG look like:
> >
> > from airflow import DAG
> > from datetime import datetime, timedelta, date from
> > airflow.contrib.operators.spark_submit_operator import
> > SparkSubmitOperator
> >
> > default_args = {
> >     'owner': 'spark',
> >     'depends_on_past': False,
> >     'start_date': datetime.now(),
> >     'retries': 1,
> >     'retry_delay': timedelta(minutes=1) }
> >
> > dag = DAG('myDAG', default_args=default_args,)
> >
> > connection_id = "SPARK"
> > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> 'spark://sparkhost:7077'
> >
> > _config = {
> >     'jars': 'spark_job.jar',
> >     'executor_memory': '2g',
> >     'name': 'myJob',
> >     'conn_id': connection_id,
> >     'java_class':'org.Job'
> > }
> >
> > operator = SparkSubmitOperator(
> >     task_id='myTask',
> >     dag=dag,
> >     **_config
> > )
> >
> > What is wrong? Could somebody help me wit it?
> >
> >
>