You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@commons.apache.org by "Raj Prakash Kante (Jira)" <ji...@apache.org> on 2022/09/16 06:19:00 UTC
[jira] [Created] (COMMONSSITE-163) Xcom not returned while using 'BeamRunJavaPipelineOperator'.
Raj Prakash Kante created COMMONSSITE-163:
---------------------------------------------
Summary: Xcom not returned while using 'BeamRunJavaPipelineOperator'.
Key: COMMONSSITE-163
URL: https://issues.apache.org/jira/browse/COMMONSSITE-163
Project: Apache Commons All
Issue Type: Bug
Reporter: Raj Prakash Kante
I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data using dataflow from google cloud storage to Bigquery using a airflow DAG.The dataflow job is submitted successfully but I want to wait until the dataflow job runs successfully in the background and then move on to the next task.I am thinking to tackle this using 'DataflowJobStatusSensor' which checks the status of the job in the background. This requires the job ID we want to check which is supposed to be returned as a Xcom by the 'BeamRunJavaPipelineOperator' but it does not return the desired Xcom.
start_java_pipeline = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline",
runner='dataflow',
jar="<path-to-java-jar>",
pipeline_options={'airflowBucket': '<bucket-path>',
'jobName': '<job-name>',
'inputfileBucket': '<input-file-path>',
'maxNumWorkers': '10',
'targetTableProject': '<Project-name>',
'datasetName': '<dataset-name>',
'serviceAccount': '<service-account>',
'runConfig': '<path-config-files>',
'project': '<project-name>',
'workerMachineType': 'n1-standard-2',
'region': '<region>',
'subnetwork': "<subnetwork>",
'usePublicIps': 'false',
'stagingLocation': '<Staging-loaction>',
'tempLocation': '<temp-location>'
},
job_class='<class-name-in-jar>',
do_xcom_push=True,
dag=dag)
wait_for_done = DataflowJobStatusSensor(
task_id="wait-for-java-dataflow",
job_id="\{{task_instance.xcom_pull('Get_job_id')}}",
expected_statuses=\{DataflowJobStatus.JOB_STATE_DONE},
project_id="xxx-xx-xxx",
gcp_conn_id='google_cloud_default',
location='us-central1',
)
start_java_pipeline >> wait_for_done
Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch the same using "DataflowJobStatusSensor" without any issues.But this is a deprecated operator.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)