You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/31 12:21:43 UTC

[GitHub] [airflow] sschaetz opened a new pull request #13996: Add GCS timespan transform operator

sschaetz opened a new pull request #13996:
URL: https://github.com/apache/airflow/pull/13996


   This change adds a new GCS transform operator. It is based on the existing transform operator with the addition that it will transform multiple files that match the prefix and that were updated within a time-span. The time-span is implicitly defined: it is the time between the current execution timestamp of the DAG instance (time-span start) and the next execution timestamp of the DAG (time-span end). 
   
   The use-case is some entity generates files at irregular intervals and an undefined number. The operator will pick up all files that were updated since it executed last. Typically the transform script will iterate over the files, open them, extract some information, collate them into one or more files and upload them to GCS. These result files can then be loaded into BigQuery or processed further or served via a webserver.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580091943



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin with
+           this prefix. Can interpolate execution date and time components. (templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to download files to be processed), or chained list of accounts required to
+        get the access_token of the last account in the list, which will be impersonated in the
+        request. If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)

Review comment:
       Yep, will make that change!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r567487850



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -16,12 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 """This module contains a Google Cloud Storage Bucket operator."""
+import datetime
 import subprocess
 import sys
 import warnings
-from tempfile import NamedTemporaryFile
+from pathlib import Path
+from tempfile import NamedTemporaryFile, TemporaryDirectory
 from typing import Dict, Iterable, List, Optional, Sequence, Union
 
+import dateutil.tz

Review comment:
       ```suggestion
   from airflow.settings import TIMEZONE
   ```
   I think using this would be better. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-787084376


   [The Workflow run](https://github.com/apache/airflow/actions/runs/605712692) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-787099372


   [The Workflow run](https://github.com/apache/airflow/actions/runs/605770719) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-774528770


   @turbaszek since you implemented the original Transfer operator I wonder if you would be willing to take a look at this one?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #13996:
URL: https://github.com/apache/airflow/pull/13996


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r567723220



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -16,12 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 """This module contains a Google Cloud Storage Bucket operator."""
+import datetime
 import subprocess
 import sys
 import warnings
-from tempfile import NamedTemporaryFile
+from pathlib import Path
+from tempfile import NamedTemporaryFile, TemporaryDirectory
 from typing import Dict, Iterable, List, Optional, Sequence, Union
 
+import dateutil.tz

Review comment:
       Good question - when we compare against GCS modified time we want to use UTC (I think GCS reports in UTC). But maybe for the transfer function it would make more sense to get the timestamps in the local time. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz edited a comment on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz edited a comment on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-774528770


   @turbaszek since you implemented the original transform-operator I wonder if you would be willing to take a look at this one?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580053097



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin with
+           this prefix. Can interpolate execution date and time components. (templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to download files to be processed), or chained list of accounts required to
+        get the access_token of the last account in the list, which will be impersonated in the
+        request. If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_prefix: str,
+        source_gcp_conn_id: str,
+        destination_bucket: str,
+        destination_prefix: str,
+        destination_gcp_conn_id: str,
+        transform_script: Union[str, List[str]],
+        source_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        destination_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.source_bucket = source_bucket
+        self.source_prefix = source_prefix
+        self.source_gcp_conn_id = source_gcp_conn_id
+        self.source_impersonation_chain = source_impersonation_chain
+
+        self.destination_bucket = destination_bucket
+        self.destination_prefix = destination_prefix
+        self.destination_gcp_conn_id = destination_gcp_conn_id
+        self.destination_impersonation_chain = destination_impersonation_chain
+
+        self.transform_script = transform_script
+        self.output_encoding = sys.getdefaultencoding()
+
+    def execute(self, context: dict) -> None:
+        # Define intervals and prefixes.
+        timespan_start = context["execution_date"]
+        timespan_end = context["dag"].following_schedule(timespan_start)
+        if timespan_end is None:
+            self.log.warning("No following schedule found, setting timespan end to max %s", timespan_end)
+            timespan_end = datetime.datetime.max
+
+        timespan_start = timespan_start.replace(tzinfo=timezone.utc)
+        timespan_end = timespan_end.replace(tzinfo=timezone.utc)
+
+        source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.source_prefix,
+            timespan_start,
+        )
+        destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.destination_prefix,
+            timespan_start,
+        )
+
+        source_hook = GCSHook(
+            gcp_conn_id=self.source_gcp_conn_id,
+            impersonation_chain=self.source_impersonation_chain,
+        )
+        destination_hook = GCSHook(
+            gcp_conn_id=self.destination_gcp_conn_id,
+            impersonation_chain=self.destination_impersonation_chain,
+        )
+
+        # Fetch list of files.
+        blobs_to_transform = source_hook.list_by_timespan(
+            bucket_name=self.source_bucket,
+            prefix=source_prefix_interp,
+            timespan_start=timespan_start,
+            timespan_end=timespan_end,
+        )
+
+        with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as temp_output_dir:
+            temp_input_dir = Path(temp_input_dir)
+            temp_output_dir = Path(temp_output_dir)
+
+            # TODO: download in parallel.
+            for blob_to_transform in blobs_to_transform:
+                destination_file = temp_input_dir / blob_to_transform
+                destination_file.parent.mkdir(parents=True, exist_ok=True)
+                source_hook.download(
+                    bucket_name=self.source_bucket,
+                    object_name=blob_to_transform,
+                    filename=str(destination_file),
+                )

Review comment:
       I'm not a fan of such operators. If single download or any other operation fails in the `for` loop whole operation will fail. This is not best way to assure idempotency I think. Also downloading multiple files to single temp dir my create some memory issues.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-787076912


   [The Workflow run](https://github.com/apache/airflow/actions/runs/605632115) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-787135581


   [The Workflow run](https://github.com/apache/airflow/actions/runs/606286579) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-787077024


   [The Workflow run](https://github.com/apache/airflow/actions/runs/605624153) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580054086



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin with
+           this prefix. Can interpolate execution date and time components. (templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to download files to be processed), or chained list of accounts required to
+        get the access_token of the last account in the list, which will be impersonated in the
+        request. If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_prefix: str,
+        source_gcp_conn_id: str,
+        destination_bucket: str,
+        destination_prefix: str,
+        destination_gcp_conn_id: str,
+        transform_script: Union[str, List[str]],
+        source_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        destination_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.source_bucket = source_bucket
+        self.source_prefix = source_prefix
+        self.source_gcp_conn_id = source_gcp_conn_id
+        self.source_impersonation_chain = source_impersonation_chain
+
+        self.destination_bucket = destination_bucket
+        self.destination_prefix = destination_prefix
+        self.destination_gcp_conn_id = destination_gcp_conn_id
+        self.destination_impersonation_chain = destination_impersonation_chain
+
+        self.transform_script = transform_script
+        self.output_encoding = sys.getdefaultencoding()
+
+    def execute(self, context: dict) -> None:
+        # Define intervals and prefixes.
+        timespan_start = context["execution_date"]
+        timespan_end = context["dag"].following_schedule(timespan_start)
+        if timespan_end is None:
+            self.log.warning("No following schedule found, setting timespan end to max %s", timespan_end)
+            timespan_end = datetime.datetime.max
+
+        timespan_start = timespan_start.replace(tzinfo=timezone.utc)
+        timespan_end = timespan_end.replace(tzinfo=timezone.utc)
+
+        source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.source_prefix,
+            timespan_start,
+        )
+        destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.destination_prefix,
+            timespan_start,
+        )
+
+        source_hook = GCSHook(
+            gcp_conn_id=self.source_gcp_conn_id,
+            impersonation_chain=self.source_impersonation_chain,
+        )
+        destination_hook = GCSHook(
+            gcp_conn_id=self.destination_gcp_conn_id,
+            impersonation_chain=self.destination_impersonation_chain,
+        )
+
+        # Fetch list of files.
+        blobs_to_transform = source_hook.list_by_timespan(
+            bucket_name=self.source_bucket,
+            prefix=source_prefix_interp,
+            timespan_start=timespan_start,
+            timespan_end=timespan_end,
+        )
+
+        with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as temp_output_dir:
+            temp_input_dir = Path(temp_input_dir)
+            temp_output_dir = Path(temp_output_dir)
+
+            # TODO: download in parallel.
+            for blob_to_transform in blobs_to_transform:
+                destination_file = temp_input_dir / blob_to_transform

Review comment:
       Nice, I didn't know that `/` will work in this way πŸ‘ 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-783225701


   Capturing the outcome of the slack conversation between @turbaszek and myself - I will add:
   
   - a retry mechanism
   - a flag to allow users to configure to warn or error if a failure occurs


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r571474800



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -16,12 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 """This module contains a Google Cloud Storage Bucket operator."""
+import datetime
 import subprocess
 import sys
 import warnings
-from tempfile import NamedTemporaryFile
+from pathlib import Path
+from tempfile import NamedTemporaryFile, TemporaryDirectory
 from typing import Dict, Iterable, List, Optional, Sequence, Union
 
+import dateutil.tz

Review comment:
       Good point. I took this from an existing hook - but the next commit has it fixed in both spots.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580062200



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin with
+           this prefix. Can interpolate execution date and time components. (templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to download files to be processed), or chained list of accounts required to
+        get the access_token of the last account in the list, which will be impersonated in the
+        request. If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_prefix: str,
+        source_gcp_conn_id: str,
+        destination_bucket: str,
+        destination_prefix: str,
+        destination_gcp_conn_id: str,
+        transform_script: Union[str, List[str]],
+        source_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        destination_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.source_bucket = source_bucket
+        self.source_prefix = source_prefix
+        self.source_gcp_conn_id = source_gcp_conn_id
+        self.source_impersonation_chain = source_impersonation_chain
+
+        self.destination_bucket = destination_bucket
+        self.destination_prefix = destination_prefix
+        self.destination_gcp_conn_id = destination_gcp_conn_id
+        self.destination_impersonation_chain = destination_impersonation_chain
+
+        self.transform_script = transform_script
+        self.output_encoding = sys.getdefaultencoding()
+
+    def execute(self, context: dict) -> None:
+        # Define intervals and prefixes.
+        timespan_start = context["execution_date"]
+        timespan_end = context["dag"].following_schedule(timespan_start)
+        if timespan_end is None:
+            self.log.warning("No following schedule found, setting timespan end to max %s", timespan_end)
+            timespan_end = datetime.datetime.max
+
+        timespan_start = timespan_start.replace(tzinfo=timezone.utc)
+        timespan_end = timespan_end.replace(tzinfo=timezone.utc)
+
+        source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.source_prefix,
+            timespan_start,
+        )
+        destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
+            self.destination_prefix,
+            timespan_start,
+        )
+
+        source_hook = GCSHook(
+            gcp_conn_id=self.source_gcp_conn_id,
+            impersonation_chain=self.source_impersonation_chain,
+        )
+        destination_hook = GCSHook(
+            gcp_conn_id=self.destination_gcp_conn_id,
+            impersonation_chain=self.destination_impersonation_chain,
+        )
+
+        # Fetch list of files.
+        blobs_to_transform = source_hook.list_by_timespan(
+            bucket_name=self.source_bucket,
+            prefix=source_prefix_interp,
+            timespan_start=timespan_start,
+            timespan_end=timespan_end,
+        )
+
+        with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as temp_output_dir:
+            temp_input_dir = Path(temp_input_dir)
+            temp_output_dir = Path(temp_output_dir)
+
+            # TODO: download in parallel.
+            for blob_to_transform in blobs_to_transform:
+                destination_file = temp_input_dir / blob_to_transform
+                destination_file.parent.mkdir(parents=True, exist_ok=True)
+                source_hook.download(
+                    bucket_name=self.source_bucket,
+                    object_name=blob_to_transform,
+                    filename=str(destination_file),
+                )

Review comment:
       Thanks for looking at this in more detail. 
   
   > This is not best way to assure idempotency I think
   
   I don't see a difference between multiple files vs a single file with regards to idempotency. If a single download fails the entire task will fail, nothing will be produced. I think what I will follow up with is a retry mechanism for the GCS download and upload - this is something that increased the reliability of tasks that interact with GCS significantly for us in production.
   
   > Also downloading multiple files to single temp dir my create some memory issues.
   
   Indeed, this can be an issue. The loop could be changed to `download single file -> process single file` but then we can not take advantage of parallel download - which is something that I want to follow up with as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580090857



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin with
+           this prefix. Can interpolate execution date and time components. (templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to download files to be processed), or chained list of accounts required to
+        get the access_token of the last account in the list, which will be impersonated in the
+        request. If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):
+        """Interpolate prefix with datetime.
+
+        :param prefix: The prefix to interpolate
+        :type prefix: str
+        :param dt: The datetime to interpolate
+        :type dt: datetime
+
+        """
+        return None if prefix is None else dt.strftime(prefix)

Review comment:
       ```suggestion
           return dt.strftime(prefix) if prefix else None
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r567743999



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -16,12 +16,15 @@
 # specific language governing permissions and limitations
 # under the License.
 """This module contains a Google Cloud Storage Bucket operator."""
+import datetime
 import subprocess
 import sys
 import warnings
-from tempfile import NamedTemporaryFile
+from pathlib import Path
+from tempfile import NamedTemporaryFile, TemporaryDirectory
 from typing import Dict, Iterable, List, Optional, Sequence, Union
 
+import dateutil.tz

Review comment:
       I guess `airflow.timezone.utc`  would be better for the GCS modified time then, just for uniformity.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-787099259


   [The Workflow run](https://github.com/apache/airflow/actions/runs/605869915) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r580091927



##########
File path: airflow/providers/google/cloud/operators/gcs.py
##########
@@ -656,6 +659,216 @@ def execute(self, context: dict) -> None:
             )
 
 
+class GCSTimeSpanFileTransformOperator(BaseOperator):
+    """
+    Determines a list of objects that were added or modified at a GCS source
+    location during a specific time-span, copies them to a temporary location
+    on the local file system, runs a transform on this file as specified by
+    the transformation script and uploads the output to the destination bucket.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:GCSTimeSpanFileTransformOperator`
+
+    The locations of the source and the destination files in the local
+    filesystem is provided as an first and second arguments to the
+    transformation script. The time-span is passed to the transform script as
+    third and fourth argument as UTC ISO 8601 string.
+
+    The transformation script is expected to read the
+    data from source, transform it and write the output to the local
+    destination file.
+
+    :param source_bucket: The bucket to fetch data from. (templated)
+    :type source_bucket: str
+    :param source_prefix: Prefix string which filters objects whose name begin with
+           this prefix. Can interpolate execution date and time components. (templated)
+    :type source_prefix: str
+    :param source_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to download files to be processed.
+    :type source_gcp_conn_id: str
+    :param source_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to download files to be processed), or chained list of accounts required to
+        get the access_token of the last account in the list, which will be impersonated in the
+        request. If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type source_impersonation_chain: Union[str, Sequence[str]]
+
+    :param destination_bucket: The bucket to write data to. (templated)
+    :type destination_bucket: str
+    :param destination_prefix: Prefix string for the upload location.
+        Can interpolate execution date and time components. (templated)
+    :type destination_prefix: str
+    :param destination_gcp_conn_id: The connection ID to use connecting to Google Cloud
+           to upload processed files.
+    :type destination_gcp_conn_id: str
+    :param destination_impersonation_chain: Optional service account to impersonate using short-term
+        credentials (to upload processed files), or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type destination_impersonation_chain: Union[str, Sequence[str]]
+
+    :param transform_script: location of the executable transformation script or list of arguments
+        passed to subprocess ex. `['python', 'script.py', 10]`. (templated)
+    :type transform_script: Union[str, List[str]]
+
+    """
+
+    template_fields = (
+        'source_bucket',
+        'source_prefix',
+        'destination_bucket',
+        'destination_prefix',
+        'transform_script',
+        'source_impersonation_chain',
+        'destination_impersonation_chain',
+    )
+
+    @staticmethod
+    def interpolate_prefix(prefix, dt):

Review comment:
       ```suggestion
       def interpolate_prefix(prefix: str, dt: datetime) -> Optional[datetime]:
   ```
   It would be nice to have the type annotations as this improves the readability of the code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r584282973



##########
File path: airflow/providers/google/cloud/hooks/gcs.py
##########
@@ -285,20 +290,43 @@ def download(
         :type chunk_size: int
         :param timeout: Request timeout in seconds.
         :type timeout: int
+        :param num_max_attempts: Number of attempts to download the file.
+        :type num_max_attempts: int
         """
         # TODO: future improvement check file size before downloading,
         #  to check for local space availability
 
-        client = self.get_conn()
-        bucket = client.bucket(bucket_name)
-        blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
-
-        if filename:
-            blob.download_to_filename(filename, timeout=timeout)
-            self.log.info('File downloaded to %s', filename)
-            return filename
-        else:
-            return blob.download_as_string()
+        num_file_attempts = 0
+
+        while num_file_attempts < num_max_attempts:
+            try:
+                num_file_attempts += 1
+                client = self.get_conn()
+                bucket = client.bucket(bucket_name)
+                blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
+
+                if filename:
+                    blob.download_to_filename(filename, timeout=timeout)
+                    self.log.info('File downloaded to %s', filename)
+                    return filename
+                else:
+                    return blob.download_as_string()
+
+            except GoogleCloudError as e:

Review comment:
       ```suggestion
               except GoogleCloudError as err:
   ```
   `e` is not allowed by pylint




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-798914137


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-770373824


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#issuecomment-799487119


   Awesome work, congrats on your first merged pull request!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sschaetz commented on a change in pull request #13996: Add GCS timespan transform operator

Posted by GitBox <gi...@apache.org>.
sschaetz commented on a change in pull request #13996:
URL: https://github.com/apache/airflow/pull/13996#discussion_r593903447



##########
File path: airflow/providers/google/cloud/hooks/gcs.py
##########
@@ -285,20 +290,43 @@ def download(
         :type chunk_size: int
         :param timeout: Request timeout in seconds.
         :type timeout: int
+        :param num_max_attempts: Number of attempts to download the file.
+        :type num_max_attempts: int
         """
         # TODO: future improvement check file size before downloading,
         #  to check for local space availability
 
-        client = self.get_conn()
-        bucket = client.bucket(bucket_name)
-        blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
-
-        if filename:
-            blob.download_to_filename(filename, timeout=timeout)
-            self.log.info('File downloaded to %s', filename)
-            return filename
-        else:
-            return blob.download_as_string()
+        num_file_attempts = 0
+
+        while num_file_attempts < num_max_attempts:
+            try:
+                num_file_attempts += 1
+                client = self.get_conn()
+                bucket = client.bucket(bucket_name)
+                blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
+
+                if filename:
+                    blob.download_to_filename(filename, timeout=timeout)
+                    self.log.info('File downloaded to %s', filename)
+                    return filename
+                else:
+                    return blob.download_as_string()
+
+            except GoogleCloudError as e:

Review comment:
       Thanks - fixed now!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org