You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Fokko Driesprong (JIRA)" <ji...@apache.org> on 2018/03/01 09:16:00 UTC

[jira] [Commented] (AIRFLOW-2124) Allow local mainPythonFileUri

    [ https://issues.apache.org/jira/browse/AIRFLOW-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16381714#comment-16381714 ] 

Fokko Driesprong commented on AIRFLOW-2124:
-------------------------------------------

We would like to integrate this in the DataProcOperator. We don't want to have additional steps We'll develop something internal which will take care of this and then push it back to Airflow. Cheers

> Allow local mainPythonFileUri
> -----------------------------
>
>                 Key: AIRFLOW-2124
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2124
>             Project: Apache Airflow
>          Issue Type: Wish
>            Reporter: robbert van waardhuizen
>            Assignee: Fokko Driesprong
>            Priority: Major
>
> For our workflow, we currently are in the transition from using BashOperator to using the DataProcPySparkOperators. While rewriting the DAG we came to the conclusion that it is not possible to submit a (local) path as our main Python file, and a Hadoop Compatible Filesystem (HCFS) is required.
> Our main Python drivers are located in a Git repository. Putting our main Python files in a GS bucket would require manual updating/overwriting these files.
> In terms of code, this works using the BashOperator:
>  
> {code:java}
> gcloud dataproc jobs submit pyspark \
>  /usr/local/airflow/git/airflow-dags/jobs/main_python_driver.py \
>                  --cluster {cluster_name}{code}
>  
>  
> But cannot be replicated using the DataProcPySparkOperator:
> {code:java}
> DataProcPySparkOperator(main="/usr/local/airflow/git/airflow-dags/jobs/main_python_driver.py",
> cluster_name=cluster_name)
> {code}
> Error:
> {code:java}
> =========== Cloud Dataproc Agent Error ===========
> java.lang.NullPointerException
> at sun.nio.fs.UnixPath.normalizeAndCheck(UnixPath.java:77)
> at sun.nio.fs.UnixPath.<init>(UnixPath.java:71)
> at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
> at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler.registerResourceForDownload(AbstractJobHandler.java:442)
> at com.google.cloud.hadoop.services.agent.job.PySparkJobHandler.buildCommand(PySparkJobHandler.java:93)
> at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler$StartDriver.call(AbstractJobHandler.java:538)
> at com.google.cloud.hadoop.services.agent.job.AbstractJobHandler$StartDriver.call(AbstractJobHandler.java:532)
> at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:127)
> at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
> at com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:80)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> ======== End of Cloud Dataproc Agent Error ========
> {code}
> What would be best practice in this case?
> Is it possible to add the ability to submit local paths as main Python file?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)