You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/30 14:58:45 UTC

[GitHub] [airflow] turbaszek commented on a change in pull request #9590: Improve idempotency of BigQueryInsertJobOperator

turbaszek commented on a change in pull request #9590:
URL: https://github.com/apache/airflow/pull/9590#discussion_r447751434



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1692,32 +1692,52 @@ def prepare_template(self) -> None:
             with open(self.configuration, 'r') as file:
                 self.configuration = json.loads(file.read())
 
+    def _submit_job(self, hook: BigQueryHook, job_id: str):
+        # Submit a new job
+        job = hook.insert_job(
+            configuration=self.configuration,
+            project_id=self.project_id,
+            location=self.location,
+            job_id=job_id,
+        )
+        # Start the job and wait for it to complete and get the result.
+        job.result()
+        return job
+
     def execute(self, context: Any):
         hook = BigQueryHook(
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
         )
 
-        job_id = self.job_id or f"airflow_{self.task_id}_{int(time())}"
+        exec_date = context['execution_date'].isoformat()
+        job_id = self.job_id or f"airflow_{self.dag_id}_{self.task_id}_{exec_date}"
+
         try:
-            job = hook.insert_job(
-                configuration=self.configuration,
-                project_id=self.project_id,
-                location=self.location,
-                job_id=job_id,
-            )
-            # Start the job and wait for it to complete and get the result.
-            job.result()
+            # Submit a new job
+            job = self._submit_job(hook, job_id)
         except Conflict:
+            # If the job already exists retrieve it
             job = hook.get_job(
                 project_id=self.project_id,
                 location=self.location,
                 job_id=job_id,
             )
-            # Get existing job and wait for it to be ready
-            for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
-                sleep(time_to_wait)
-                job.reload()
-                if job.done():
-                    break
+
+            if job.done() and job.error_result:
+                # The job exists and finished with an error and we are probably reruning it
+                # So we have to make a new job_id because it has to be unique
+                job_id = f"{self.job_id}_{int(time())}"

Review comment:
       ```suggestion
                   job_id = f"{job_id}_{int(time())}"
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org