You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@commons.apache.org by "Gary D. Gregory (Jira)" <ji...@apache.org> on 2022/09/16 14:28:00 UTC

[jira] [Commented] (COMMONSSITE-163) Xcom not returned while using 'BeamRunJavaPipelineOperator'.

    [ https://issues.apache.org/jira/browse/COMMONSSITE-163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605854#comment-17605854 ] 

Gary D. Gregory commented on COMMONSSITE-163:
---------------------------------------------

Why is this tick created for the web site? You to pick the appropriate project. 

> Xcom not returned while using 'BeamRunJavaPipelineOperator'.
> ------------------------------------------------------------
>
>                 Key: COMMONSSITE-163
>                 URL: https://issues.apache.org/jira/browse/COMMONSSITE-163
>             Project: Apache Commons All
>          Issue Type: Bug
>            Reporter: Raj Prakash Kante
>            Priority: Major
>
> I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data using dataflow from google cloud storage to Bigquery using a airflow DAG.The dataflow job is submitted successfully but I want to wait until the dataflow job runs successfully in the background and then move on to the next task.I am thinking to tackle this using 'DataflowJobStatusSensor' which checks the status of the job in the background. This requires the job ID we want to check which is supposed to be returned as a Xcom by the 'BeamRunJavaPipelineOperator' but it does not return the desired Xcom.
>  
>  
>  
> start_java_pipeline = BeamRunJavaPipelineOperator(
> task_id="start_java_pipeline",
> runner='dataflow',
> jar="<path-to-java-jar>",
> pipeline_options={'airflowBucket': '<bucket-path>',
> 'jobName': '<job-name>',
> 'inputfileBucket': '<input-file-path>',
> 'maxNumWorkers': '10',
> 'targetTableProject': '<Project-name>',
> 'datasetName': '<dataset-name>',
> 'serviceAccount': '<service-account>',
> 'runConfig': '<path-config-files>',
> 'project': '<project-name>',
> 'workerMachineType': 'n1-standard-2',
> 'region': '<region>',
> 'subnetwork': "<subnetwork>",
> 'usePublicIps': 'false',
> 'stagingLocation': '<Staging-loaction>',
> 'tempLocation': '<temp-location>'
> },
> job_class='<class-name-in-jar>',
> do_xcom_push=True,
> dag=dag)
>  
>  
> wait_for_done = DataflowJobStatusSensor(
> task_id="wait-for-java-dataflow",
> job_id="\{{task_instance.xcom_pull('Get_job_id')}}",
> expected_statuses=\{DataflowJobStatus.JOB_STATE_DONE},
> project_id="xxx-xx-xxx",
> gcp_conn_id='google_cloud_default',
> location='us-central1',
> )
> start_java_pipeline  >> wait_for_done
>  
> Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch the same using "DataflowJobStatusSensor" without any issues.But this is a deprecated operator.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)