You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/04/24 04:41:15 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #5166: [AIRFLOW-4397] Add GCSUploadSessionCompleteSensor

potiuk commented on a change in pull request #5166: [AIRFLOW-4397] Add GCSUploadSessionCompleteSensor
URL: https://github.com/apache/airflow/pull/5166#discussion_r277952601
 
 

 ##########
 File path: airflow/contrib/sensors/gcs_sensor.py
 ##########
 @@ -160,3 +162,105 @@ def poke(self, context):
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)
         return bool(hook.list(self.bucket, prefix=self.prefix))
+
+
+class GoogleCloudStorageUploadSessionCompleteSensor(BaseSensorOperator):
+    """
+    Checks for changes in the number of files at prefix in Google Cloud Storage
+    bucket and returns True if the inactivity period has passed with no
+    increase in the number of files. Note, it is recommended to use reschedule
+    mode if you expect this sensor to run for hours.
+
+    :param bucket: The Google cloud storage bucket where the objects are.
+        expected.
+    :type bucket: str
+    :param prefix: The name of the prefix to check in the Google cloud
+        storage bucket.
+    :param inactivity_period: The total seconds of inactivity to designate
+        an upload session is over. Note, this mechanism is not real time and
+        this operator may not return until a poke_interval after this period
+        has passed with no additional files sensed.
+    :type inactivity_period: int
+    :param min_files: The minimum number of files needed for upload session
+        to be considered valid.
+    :type min_files: int
+    :param previous_num_files: The previous number of files before the next
+        iteration.
+    :type previous_num_files: int
+    :param inactivity_seconds: The current seconds of the inactivity period.
+    :type inactivity_seconds: int
+    :param google_cloud_conn_id: The connection ID to use when connecting
+        to Google cloud storage.
+    :type google_cloud_conn_id: str
+    :param delegate_to: The account to impersonate, if any. For this to work,
+        the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    """
+
+    template_fields = ('bucket', 'prefix')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 prefix,
+                 inactivity_period=60 * 60,
+                 min_files=1,
+                 previous_num_files=0,
+                 google_cloud_conn_id='google_cloud_default',
+                 delegate_to=None,
+                 *args, **kwargs):
+
+        super(GoogleCloudStorageUploadSessionCompleteSensor, self).__init__(*args, **kwargs)
+
+        self.bucket = bucket
+        self.prefix = prefix
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.inactivity_period = inactivity_period
+        self.min_files = min_files
+        self.previous_num_files = previous_num_files
+        self.inactivity_seconds = 0
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+
+    def is_bucket_updated(self, current_num_files):
+        """
+        Checks whether new files have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param current_num_files: number of files in bucket during last poke.
+        :type current_num_files: int
+        """
+
+        if current_num_files > self.previous_num_files:
+            # When new files arrived, reset the inactivity_seconds
+            # previous_num_files for the next poke.
+            self.inactivity_seconds = 0
+            self.previous_num_files = current_num_files
+        else:
+            self.inactivity_seconds = self.inactivity_seconds \
+                + self.poke_interval
 
 Review comment:
   I think it would have been better to use the actual system time for inactivity seconds calculation (current_time - last_activity_time). Using poke_interval - especially multiple times - does not take into account potential scheduling delays. There is no guarantee that the "is_bucket_updated" will be always called exactly after poke_interval seconds. It will be a bit more difficult for unit testing but it can be done with mocking the current_time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services