You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@commons.apache.org by "Raj Prakash Kante (Jira)" <ji...@apache.org> on 2022/09/16 06:19:00 UTC

[jira] [Created] (COMMONSSITE-163) Xcom not returned while using 'BeamRunJavaPipelineOperator'.

Raj Prakash Kante created COMMONSSITE-163:
---------------------------------------------

             Summary: Xcom not returned while using 'BeamRunJavaPipelineOperator'.
                 Key: COMMONSSITE-163
                 URL: https://issues.apache.org/jira/browse/COMMONSSITE-163
             Project: Apache Commons All
          Issue Type: Bug
            Reporter: Raj Prakash Kante


I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data using dataflow from google cloud storage to Bigquery using a airflow DAG.The dataflow job is submitted successfully but I want to wait until the dataflow job runs successfully in the background and then move on to the next task.I am thinking to tackle this using 'DataflowJobStatusSensor' which checks the status of the job in the background. This requires the job ID we want to check which is supposed to be returned as a Xcom by the 'BeamRunJavaPipelineOperator' but it does not return the desired Xcom.

 

 

 

start_java_pipeline = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline",
runner='dataflow',
jar="<path-to-java-jar>",
pipeline_options={'airflowBucket': '<bucket-path>',
'jobName': '<job-name>',
'inputfileBucket': '<input-file-path>',
'maxNumWorkers': '10',
'targetTableProject': '<Project-name>',
'datasetName': '<dataset-name>',
'serviceAccount': '<service-account>',
'runConfig': '<path-config-files>',
'project': '<project-name>',
'workerMachineType': 'n1-standard-2',
'region': '<region>',
'subnetwork': "<subnetwork>",
'usePublicIps': 'false',
'stagingLocation': '<Staging-loaction>',
'tempLocation': '<temp-location>'
},
job_class='<class-name-in-jar>',
do_xcom_push=True,
dag=dag)

 

 

wait_for_done = DataflowJobStatusSensor(
task_id="wait-for-java-dataflow",
job_id="\{{task_instance.xcom_pull('Get_job_id')}}",
expected_statuses=\{DataflowJobStatus.JOB_STATE_DONE},
project_id="xxx-xx-xxx",
gcp_conn_id='google_cloud_default',
location='us-central1',
)

start_java_pipeline  >> wait_for_done

 


Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch the same using "DataflowJobStatusSensor" without any issues.But this is a deprecated operator.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)